package org.zjvis.datascience.service;

import static org.zjvis.datascience.common.util.FileImportUtil.close;
import static org.zjvis.datascience.common.util.FileImportUtil.readSheet;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.CharUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.db.Entity;
import com.alibaba.fastjson.JSONObject;
import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpException;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.sql.DataSource;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.poi.ooxml.POIXMLException;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.stereotype.Component;
import org.springframework.web.multipart.MultipartFile;
import org.zjvis.datascience.common.constant.Constant;
import org.zjvis.datascience.common.constant.DatabaseConstant;
import org.zjvis.datascience.common.constant.DatasetConstant;
import org.zjvis.datascience.common.constant.FileConstant;
import org.zjvis.datascience.common.dto.DatasetImportResDTO;
import org.zjvis.datascience.common.dto.FileUploadDTO;
import org.zjvis.datascience.common.dto.FileUploadRequestDTO;
import org.zjvis.datascience.common.dto.dataset.DatasetColumnDTO;
import org.zjvis.datascience.common.dto.dataset.DatasetFileWriteDTO;
import org.zjvis.datascience.common.dto.dataset.DatasetJsonInfo;
import org.zjvis.datascience.common.enums.DataTypeEnum;
import org.zjvis.datascience.common.enums.DateTimeFormatEnum;
import org.zjvis.datascience.common.enums.DateTimeFormatTypeEnum;
import org.zjvis.datascience.common.enums.FileCheckMd5Status;
import org.zjvis.datascience.common.enums.FileTypeEnum;
import org.zjvis.datascience.common.exception.BaseErrorCode;
import org.zjvis.datascience.common.exception.DataScienceException;
import org.zjvis.datascience.common.model.ApiResultCode;
import org.zjvis.datascience.common.pool.ExcelPool;
import org.zjvis.datascience.common.util.DatasetUtil;
import org.zjvis.datascience.common.util.FileImportUtil;
import org.zjvis.datascience.common.util.FileUtil;
import org.zjvis.datascience.common.util.JwtUtil;
import org.zjvis.datascience.common.util.Preconditions;
import org.zjvis.datascience.common.util.RedisUtil;
import org.zjvis.datascience.common.util.SqlUtil;
import org.zjvis.datascience.common.util.db.CryptoUtil;
import org.zjvis.datascience.common.util.db.DataTypeValidator;
import org.zjvis.datascience.common.util.db.JDBCUtil;
import org.zjvis.datascience.common.vo.PreviewDatasetListVO;
import org.zjvis.datascience.common.vo.dataset.DatasetAndHeadVO;
import org.zjvis.datascience.common.vo.dataset.DatasetColInfoVO;
import org.zjvis.datascience.common.vo.dataset.DatasetFileWriteFileVO;
import org.zjvis.datascience.common.vo.dataset.DatasetFileWriteVO;
import org.zjvis.datascience.common.vo.dataset.DatasetFilesWriteVO;
import org.zjvis.datascience.common.vo.dataset.HeadVO;
import org.zjvis.datascience.common.vo.dataset.PreviewDatasetVO;
import org.zjvis.datascience.service.csv.CsvSemiotic;
import org.zjvis.datascience.service.csv.ErrorLineCode;
import org.zjvis.datascience.service.csv.ParseFile;
import org.zjvis.datascience.service.csv.State;
import org.zjvis.datascience.service.csv.dto.CsvParseInfo;
import org.zjvis.datascience.service.csv.dto.ErrorInfo;
import org.zjvis.datascience.service.csv.dto.ErrorLineCodeInfo;
import org.zjvis.datascience.service.csv.dto.ParseInfo;
import org.zjvis.datascience.service.csv.pool.CsvParsePool;
import org.zjvis.datascience.service.csv.semiotic.Escape;
import org.zjvis.datascience.service.csv.semiotic.Quote;
import org.zjvis.datascience.service.csv.semiotic.Separate;
import org.zjvis.datascience.service.csv.slice.CsvReader;
import org.zjvis.datascience.service.dataset.DatasetService;
import org.zjvis.datascience.service.dataset.ImportDataService;

/**
 * @description SFTP 连接服务 Service
 * @date 2021-12-21
 */
@Component
public class SftpConnectService {

    // FTP 登录用户名
    @Value("${sftp.username}")
    private String userName;
    // FTP 登录密码
    @Value("${sftp.password}")
    private String password;
    // FTP 服务器地址IP地址
    @Value("${sftp.host}")
    private String host;
    // FTP 端口
    @Value("${sftp.port}")
    private int port;
    /**
     * 服务器文件路径, 末尾需要有/
     */
    @Value("${sftp.filepath}")
    private String filepath;

    @Value("${sftp.tempfilepath}")
    private String tempfilepath;
    //greenplum数据库端口
    @Value("${greenplum.port}")
    private int gpPort;
    //连接greenplum数据库url
    @Value("${greenplum.url}")
    private String gpUrl;
    //连接greenplum数据库 用户名
    @Value("${greenplum.username}")
    private String gpUsername;
    //连接greenplum数据库 密码
    @Value("${greenplum.password}")
    private String gpPassword;
    @Value("${greenplum.master-host-name}")
    private String gpMasterHostName;
    @Value("${greenplum.gpload.version}")
    private String gpLoadVersion;
    @Value("${greenplum.gpload.local-host-name}")
    private String gpLoadLocalHostName;
    @Value("${greenplum.gpload.port-range}")
    private String gpLoadPortRange;
    @Value("${greenplum.gpload.database}")
    private String gpLoadDatabase;

    @Value("${app.large-file-threshold}")
    private String bigFileThreshold;
    @Value("${upload.folderpath}")
    private String sliceFolderPath;
    @Value("${upload.chunkSize}")
    private long sliceChunkSize;
    @Value("${upload.expireTime}")
    private long expireTime;


    private Session yamlSshSession = null;
//    private ChannelSftp yamlSftp = null;

    public static final String CHANNEL_TYPE_SFTP = "sftp";
    public static final String CHANNEL_TYPE_EXEC = "exec";
    private static final int YML_FILE_NAME_SIZE = 8;
    private static final int FILE_RANDOM_NAME_SIZE = 5;
    private static final int PRE_VALIDATE_DATA_SIZE = 21;
    private static final int BlockSize = 83886080;  // 每块大小不超过80M
    private static final int MAXJUDGE = 100;
    private static final long INDEX_ROW_THRESHOLD = 10000; //超过10000行的表对record_id字段建立索引

    public static String SEPARATOR = DatasetConstant.DEFAULT_SEPARATOR;

    private final static Logger logger = LoggerFactory.getLogger(SftpConnectService.class);

    @Autowired
    SemanticService semanticService;

    @Autowired
    private ExcelPool threadPool;

    @Autowired
    private CsvParsePool csvParsePool;

    @Autowired
    private DatasetService datasetService;

    @Autowired
    RedisUtil redisUtil;

    @PostConstruct
    public void init() {
        initSession();
//        initYamlChannel();
    }

    @PreDestroy
    public void destroy() {
        clearSessionAndChannel();
    }

    public void clearSessionAndChannel() {
//        if (yamlSftp != null) {
//            yamlSftp.disconnect();
//        }
        if (yamlSshSession != null) {
            yamlSshSession.disconnect();
        }
    }

    public void initSession() {
        JSch jsch = new JSch();
        try {
            yamlSshSession = jsch.getSession(userName, host, port);
            // 添加密码
            yamlSshSession.setPassword(password);
            Properties sshConfig = new Properties();
            sshConfig.put("StrictHostKeyChecking", "no");
            yamlSshSession.setConfig(sshConfig);
            // 开启sshSession链接
            yamlSshSession.connect();
            logger.info("yaml connect gp server success");
        } catch (JSchException e) {
            logger.error("yaml connect gp server failed", e);
        }
    }

//    public void initYamlChannel() {
//        try {
//            yamlSftp = (ChannelSftp) yamlSshSession.openChannel(CHANNEL_TYPE_SFTP);
//            yamlSftp.connect();
//        } catch (JSchException e) {
//            logger.error("open sftp channel failed", e);
//        }
//    }


    /**
     * 执行服务器端命令，数据脱敏
     *
     * @param ymlFileName
     * @param csvFileName
     * @param maskingSql
     * @param dropSql
     * @throws JSchException
     */
    private void execute(DatasetImportResDTO importResDTO, String ymlFileName, String csvFileName,
        String maskingSql, String dropSql, String targetTable, Boolean firstImport)
        throws JSchException {
        ChannelExec channelExec = (ChannelExec) yamlSshSession.openChannel(CHANNEL_TYPE_EXEC);
        if (channelExec == null) {
            throw new DataScienceException(BaseErrorCode.DATASET_IMPORT_CONNECT_GP_SERVER_ERROR);
        }
        try (InputStream in = channelExec.getInputStream();
            BufferedReader reader = new BufferedReader(new InputStreamReader(in, "utf-8"))) {

            channelExec.setErrStream(System.err);
            channelExec.setCommand("gpload -f " + filepath + ymlFileName);
            channelExec.connect();

            StringBuilder log = new StringBuilder();
            String line;

            while ((line = reader.readLine()) != null) {
                logger.info(line);
                log.append(line).append(" ");
            }
            logger.info("command exec exit status:{}", channelExec.getExitStatus());
            String logInfo = log.toString();
            Pattern pattern = Pattern.compile(DatasetConstant.GPLOAD_STATUS_REGEX);
            Matcher matcher = pattern.matcher(logInfo);
            long tableRows = 0;
            long Inserted = 0;
            long formatErrors = 0;
            if (matcher.find()) {
                if ("succeeded".equals(matcher.group(3))
                    && (Integer.parseInt(matcher.group(2)) > 0 || Integer.parseInt(matcher.group(1)) == 0)) {
                    if (firstImport) {
                        //删除表
                        dropTable(targetTable);
                        channelExec.disconnect();
                        clearYmlAndCsvFile(csvFileName, ymlFileName);
                        throw DataScienceException
                            .of(BaseErrorCode.DATASET_IMPORT_CSV_GPLOAD_ERROR, null, "文件格式有误或者数据为空，请确认");
                    } else {
                        Inserted = Long.parseLong(matcher.group(1));
                        formatErrors = Long.parseLong(matcher.group(2));
                        importResDTO.setInserted(Inserted);
                        importResDTO.setFormatErrors(formatErrors);
                        importResDTO.setSuccessed(false);
                    }
                }
                tableRows = Long.parseLong(matcher.group(1));
            }
            if (channelExec.getExitStatus() > 0) {
                clearYml(ymlFileName);
                try {
                    analyseGPLoadErrorMsg(logInfo);
                } catch (JSchException e) {
                    //删除表
                    dropTable(targetTable);
                    channelExec.disconnect();
                    throw new DataScienceException("Upload file error.", e);
                }
            }

            //执行脱敏相关处理
            masking(maskingSql, dropSql);

            //如果行数>阈值，对该表执行增加索引操作
            if (tableRows > INDEX_ROW_THRESHOLD) {
                createIndex(targetTable);
            }

            clearYmlAndCsvFile(csvFileName, ymlFileName);
        } catch (IOException e) {
            throw new DataScienceException(BaseErrorCode.DATASET_IMPORT_CONNECT_GP_SERVER_ERROR, e);
        }
        channelExec.disconnect();
    }

    /**
     * 分析gpload上传失败日志信息
     *
     * @param logErrMsg
     * @return
     */
    public void analyseGPLoadErrorMsg(String logErrMsg) throws JSchException {
        String pt = DatasetConstant.GPLOAD_ERROR_REGEX;
        String res = "";
        Pattern pattern = Pattern.compile(pt);
        Matcher matcher = pattern.matcher(logErrMsg);
        if (matcher.find()) {
            String errMsg = matcher.group(1);
            if (errMsg.matches(DatasetConstant.GPLOAD_REJECT_LIMIT_ERROR)) {
                pattern = Pattern.compile(DatasetConstant.GPLOAD_ERROR_DETAIL_REGEX);
                matcher = pattern.matcher(errMsg);
                if (matcher.find()) {
                    res = matcher.group(1);
                } else {
                    res = errMsg;
                }
            } else if (errMsg.matches(DatasetConstant.GPLOAD_ERROR_NUMBERIC_REGEX)) {
                pattern = Pattern.compile("numeric:(.*)\\(seg");
                matcher = pattern.matcher(errMsg);
                if (matcher.find()) {
                    res = matcher.group(1);
                } else {
                    res = errMsg;
                }
                res = StrUtil.format("值：{} 无法转换为数字类型", res);
                throw DataScienceException
                    .of(BaseErrorCode.DATASET_IMPORT_CSV_GPLOAD_ERROR_TYPENUM, null, res);
            } else if (errMsg.matches(DatasetConstant.GPLOAD_ERROR_DATE_TIME_REGEX)) {
                String es = errMsg.split(":")[1];
                if (StringUtils.isNotBlank(es)) {
                    es = es.split("\\(")[0];
                }
                res = StrUtil.format("值：{} 无法转换为日期类型", es);
                throw DataScienceException
                    .of(BaseErrorCode.DATASET_IMPORT_CSV_GPLOAD_ERROR_TYPEDATE, null, res);
            } else if (errMsg.matches(DatasetConstant.GPLOAD_ERROR_FILE_NOT_FOUNT_REGEX)) {
                res = "服务器文件丢失，请尝试重新上传";
                throw DataScienceException
                    .of(BaseErrorCode.SLICE_UPLOAD_FILE_NOT_FOUND, null, res);
            } else {
                res = errMsg;
            }
        } else {
            res = logErrMsg;
        }
        throw DataScienceException.of(BaseErrorCode.DATASET_IMPORT_CSV_GPLOAD_ERROR, null, res);
    }

    /**
     * 脱敏及删旧表操作
     *
     * @param maskingSql 脱敏sql
     * @param dropSql    删旧表sql
     */
    public void masking(String maskingSql, String dropSql) {
        if (StringUtils.isNotBlank(maskingSql)) {
            DataSource ds = JDBCUtil.getDataSource(gpUrl, gpUsername, gpPassword);
            int i = 1;
            Boolean maskRes = false;

            while (true) {
                try {
                    maskRes = JDBCUtil.execute(ds.getConnection(), maskingSql);
                    break;
                } catch (SQLException e) {
                    logger.error(String.format("第%s次脱敏失败:", i, e.getMessage()), e);
                    //失败后重试两次
                    if (i == 3) {
                        throw DataScienceException
                            .of(BaseErrorCode.DATASET_IMPORT_MASKING_ERROR, null, "数据脱敏异常");
                    }
                } finally {
                    i++;
                }
            }
            //删除旧表
            if (maskRes) {
                try {
                    if (StringUtils.isNotBlank(dropSql)) {
                        JDBCUtil.execute(ds.getConnection(), dropSql);
                    }
                } catch (SQLException e) {
                    //异常暂时不处理，会有定时任务处理无用的表
                    logger.warn("删除旧表异常，", e.getMessage(), e);
                }
            }
        }
    }

    public void clearFile(String... files) {
        try {
            //判断session是否断开连接，断开重连
            AtomicInteger retryTimes = new AtomicInteger(0);
            while (!yamlSshSession.isConnected()) {
                try {
                    if (retryTimes.incrementAndGet() > 5) {
                        throw new DataScienceException(String.format(
                                "session reconnect retryTimes(%s) exceed max attempts",
                                retryTimes.get()));
                    }
                    yamlSshSession.connect();
                } catch (JSchException jSchException) {
                    logger.warn(
                            String.format(
                                    "yamlSshSession reconnect failed, try to init a new session: %s",
                                    jSchException.getMessage()),
                            jSchException);
                    yamlSshSession.disconnect();
                    initSession();
                }
            }
            ChannelSftp yamlSftp = (ChannelSftp) yamlSshSession.openChannel(CHANNEL_TYPE_SFTP);
            yamlSftp.connect();
            for (String file : files) {
                yamlSftp.rm(file);
            }
            if (yamlSftp != null) {
                yamlSftp.disconnect();
            }
        } catch (SftpException | JSchException e) {
            logger.warn(String.format("clear files:%s error", files), e);
        }
    }

    public void clearYml(String ymlFileName) {
        clearFile(filepath + ymlFileName);
    }

    public void clearYmlAndCsvFile(String csvFileName, String ymlFileName) {
        String csv = filepath + csvFileName.replaceAll("\\s*", "");
        String yml = filepath + ymlFileName;
        clearFile(csv, yml);
    }

    /***
     * 上传cvs文件
     * @param multipartFile
     */
    public void uploadFile(MultipartFile multipartFile, String fileNameWithoutExt, String charSet) {
        logger.info("uploadFile {}", multipartFile.getOriginalFilename());

        String fileNameWithExt = fileNameWithoutExt + CharUtil.DOT + FileTypeEnum.CSV.getValue();

        try {
            //判断session是否断开连接，断开重连
            AtomicInteger retryTimes = new AtomicInteger(0);
            while (!yamlSshSession.isConnected()) {
                try {
                    if (retryTimes.incrementAndGet() > 5) {
                        throw new DataScienceException(String.format(
                            "session reconnect retryTimes(%s) exceed max attempts",
                            retryTimes.get()));
                    }
                    yamlSshSession.connect();
                } catch (JSchException jSchException) {
                    logger.warn(
                        String.format(
                            "yamlSshSession reconnect failed, try to init a new session: %s",
                            jSchException.getMessage()),
                        jSchException);
                    yamlSshSession.disconnect();
                    initSession();
                }
            }

            ChannelSftp channelSftp = (ChannelSftp) yamlSshSession.openChannel(CHANNEL_TYPE_SFTP);
            channelSftp.connect();
            InputStream in = multipartFile.getInputStream();
            channelSftp.put(in, filepath + fileNameWithExt);

            channelSftp.disconnect();
        } catch (JSchException | SftpException e) {
            throw new DataScienceException("文件上传GP服务器失败", e);
        } catch (IOException e) {
            throw new DataScienceException("文件读取失败", e);
        }
    }


    /***
     * 上传cvs文件
     * @param file
     */
    public void uploadFile(File file, String charSet) {
        logger.info("uploadFile {}", file.getName());

        String fileNameWithExt = file.getName();

        try {
            //判断session是否断开连接，断开重连
            if (!yamlSshSession.isConnected()) {
                yamlSshSession.connect();
            }
            ChannelSftp channelSftp = (ChannelSftp) yamlSshSession.openChannel(CHANNEL_TYPE_SFTP);
            channelSftp.connect();
            InputStream in = new FileInputStream(file);
            channelSftp.put(in, filepath + fileNameWithExt);

            channelSftp.disconnect();
        } catch (JSchException | SftpException e) {
            throw new DataScienceException("文件上传GP服务器失败", e);
        } catch (IOException e) {
            throw new DataScienceException("文件读取失败", e);
        }
    }

    /**
     * 上传yml文件执行gpload命令及脱敏、空值处理相关操作
     *
     * @param config
     * @param csvName
     * @param maskingSql
     * @param dropSql
     */
    public void uploadYmlAndExecGpLoadAndMasking(DatasetImportResDTO importResDTO, String config,
        String csvName, String maskingSql, String dropSql, String targetTable,
        Boolean firstImport, String formatName, String md5) {
        logger.info("进入rzYml()方法 参数为空：" + config);
        if (StringUtils.isEmpty(config)) {
            throw new DataScienceException("Upload file gp yml config is empty." + config);
        }

        try (InputStream configIs = new ByteArrayInputStream(config.getBytes(StandardCharsets.UTF_8))) {
            logger.info("configFile Charset:", Charset.defaultCharset().name());
            String ymlFileName = RandomUtil.randomString(YML_FILE_NAME_SIZE) + ".yml";
            // 本地文件目录中文件名，保存服务器指定路径目录下

            //判断session是否断开连接，断开重连
            AtomicInteger retryTimes = new AtomicInteger(0);
            while (!yamlSshSession.isConnected()) {
                try {
                    if (retryTimes.incrementAndGet() > 5) {
                        throw new DataScienceException(String.format(
                                "session reconnect retryTimes(%s) exceed max attempts",
                                retryTimes.get()));
                    }
                    yamlSshSession.connect();
                } catch (JSchException jSchException) {
                    logger.warn(
                            String.format(
                                    "yamlSshSession reconnect failed, try to init a new session: %s",
                                    jSchException.getMessage()),
                            jSchException);
                    yamlSshSession.disconnect();
                    initSession();
                }
            }
            ChannelSftp yamlSftp = (ChannelSftp) yamlSshSession.openChannel(CHANNEL_TYPE_SFTP);
            yamlSftp.connect();
            yamlSftp.put(configIs, filepath + ymlFileName);
            if (yamlSftp != null) {
                yamlSftp.disconnect();
            }
            execute(importResDTO, ymlFileName, csvName, maskingSql, dropSql, targetTable,
                firstImport);
        } catch (SftpException | JSchException | IOException e) {
            logger.error(e.getMessage());
            throw new DataScienceException("Upload file error.", e);
        } catch (DataScienceException e) {
            logger.error(e.getMessage());
            if(StringUtils.isNotBlank(md5) && e.getApiResult().getCode() == BaseErrorCode.SLICE_UPLOAD_FILE_NOT_FOUND.getCode()) {
                deleteFileCatchInfo(md5);
            }
            throw e;
        }
    }



    //获取流文件
    private void inputStreamToFile(InputStream ins, File file) {
        try {
            OutputStream os = new FileOutputStream(file);
            int bytesRead = 0;
            byte[] buffer = new byte[8192];
            while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
                os.write(buffer, 0, bytesRead);
            }
            os.close();
            ins.close();
        } catch (Exception e) {
            logger.error("inputStreamToFile: ", e);
        }
    }

    /***
     * cvs文件预览
     * @return
     * @throws IOException
     */
    public PreviewDatasetVO parsingCVSFile(FileUploadRequestDTO fileUploadRequestDTO, String charSet,
        CsvSemiotic csvSemiotic, String fileNameExt, String formatName) {
        boolean needHeader = Boolean.parseBoolean(fileUploadRequestDTO.getFirstLineAsFields());
        try {
            InputStreamReader isr;
            if ("utf-8".equals(charSet) || "Unicode".equals(charSet)) {
                isr = new InputStreamReader(fileUploadRequestDTO.getFile().getInputStream(), "UTF-8");
            } else {
                isr = new InputStreamReader(fileUploadRequestDTO.getFile().getInputStream(), "GB2312");
            }
            BufferedReader br = new BufferedReader(isr);
            int lineCnt = 0;
//            int lineCntWithN = 1;
            boolean isFileWithN = false;
            if (fileUploadRequestDTO.getTotalChunks() > 1) {
                File theFile = new File(sliceFolderPath + File.separatorChar + fileUploadRequestDTO.getIdentifier(), FileConstant.FILE_ORIGIN + "_" + fileUploadRequestDTO.getChunkNumber() + ".csv");
                RandomAccessFile raf = new RandomAccessFile(theFile, "rw");
                long l = theFile.length();
                raf.seek(l - 1);
                int i = raf.read(); //10 or 13
                raf.close();
                if (i == 10 || i == 13) {
                    isFileWithN = true;
                }
                try (LineNumberReader lineNumberReader = new LineNumberReader(new FileReader(theFile))){
                    lineNumberReader.skip(Long.MAX_VALUE);
                    lineCnt = lineNumberReader.getLineNumber();
                    lineCnt += 1; //实际上是读取换行符数量 , 所以需要+1
                    if (isFileWithN) lineCnt -= 1;
                } catch (IOException e) {
                    logger.error("读取行数失败: ", e);
                    return null;
                }
            }

            CsvReader csvReader = null;
            List<String> headers = null;
            List<HeadVO> heads = null;
            if (fileUploadRequestDTO.getChunkNumber()==0&&formatName.substring(formatName.length()-3).contains("0") && redisUtil.hasKey("csvResult_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName)) {
                Object csvParseInfo = redisUtil.blpop("csvResult_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName);
                // 删除校验信息， 每次重复
                File file = new File(((CsvParseInfo) csvParseInfo).getPath());
                if(file.delete()){
                    logger.info(file.getName() + " 文件已被删除！");
                }else{
                    logger.info("文件删除失败！");
                }
            }
            // 生成临时文件，用于写入校验文件
            File temp = new File(sliceFolderPath + File.separatorChar + fileUploadRequestDTO.getIdentifier() + File.separator + formatName,
                    "Csv_temp_" + fileUploadRequestDTO.getIdentifier() + (fileUploadRequestDTO.getTotalChunks() > 1 ? "":"_" + fileNameExt) + ".csv");
            FileChannel mFileChannel = new FileOutputStream(temp, true).getChannel();
            Integer maxShow = 5000 / fileUploadRequestDTO.getTotalChunks() < 0 ? 1:5000 / fileUploadRequestDTO.getTotalChunks();

            if (fileUploadRequestDTO.getChunkNumber() == 0) {
                csvReader = new CsvReader(br, true, csvSemiotic, lineCnt, fileUploadRequestDTO.getChunkNumber(), fileUploadRequestDTO.getTotalChunks(), isFileWithN, maxShow);
                headers = csvReader.getHeaders(needHeader);
                heads = PreviewDatasetVO. toHead(headers, needHeader);
                double nowId = 1;
                if (!formatName.substring(formatName.length()-3).contains("0")) nowId = redisUtil.hincr(FileConstant.FILE_IS_ING, fileUploadRequestDTO.getIdentifier() + "_" + formatName + "_" + -1, 1);
                if (nowId == 1) {
                    redisUtil.lSet(FileConstant.FILE_HEADER + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName, headers, expireTime);
                    // 写入标题
                    String res = String.join(csvReader.getSeparate(), headers);
                    // 获取缓冲区
                    ByteBuffer buf = ByteBuffer.allocateDirect(res.getBytes(StandardCharsets.UTF_8).length);
                    buf.clear();
                    // 存入数据到缓冲区
                    buf.put(res.getBytes(StandardCharsets.UTF_8));
                    buf.flip(); // 切换到读数据模式
                    while (buf.hasRemaining()) {
                        mFileChannel.write(buf);
                    }
                }
            } else {
                // 阻塞获取 保证了第一块先获取
//                firstPreviewData =
//                        (List<PreviewDatasetVO>)redisUtil.blpop(FileConstant.FILE_UPLOAD_PREVIEW_DATA + "_" + fileUploadRequestDTO.getIdentifier() + "_" + fileUploadRequestDTO.getCharSet() + "_" + 0);
                headers = (List<String>) redisUtil.blpop(FileConstant.FILE_HEADER + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName);
                redisUtil.lSet(FileConstant.FILE_HEADER + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName, headers, expireTime);
                heads = PreviewDatasetVO. toHead(headers, needHeader);
//                redisUtil.lSet(FileConstant.FILE_UPLOAD_PREVIEW_DATA + "_" + fileUploadRequestDTO.getIdentifier() + "_" + fileUploadRequestDTO.getCharSet() + "_" + 0, firstPreviewData, expireTime);
//                heads = firstPreviewData.get(0).getHead();
//                headers = firstPreviewData.get(0).getHeaders();
                csvReader = new CsvReader(br, false, csvSemiotic, lineCnt, fileUploadRequestDTO.getChunkNumber(), fileUploadRequestDTO.getTotalChunks(), isFileWithN, heads.size(), maxShow);
            }
            // 当前预览信息大小
            List<Entity> data = new LinkedList<>();
            int needRows = DatasetConstant.DATA_TYPE_CHECK_SIZE / fileUploadRequestDTO.getTotalChunks();
            int cnt = 0;
            // 处理数据部分
            while (csvReader.getState() != State.enddocument) {
                List<String> record = csvReader.readNext(false);
                if (record == null || record.size() == 0) continue;
                // 写成功的句子
                cnt++;
                String res = "\r\n" + String.join(csvReader.getSeparate(), record);
                // 获取缓冲区
                ByteBuffer buf = ByteBuffer.allocateDirect(res.getBytes(StandardCharsets.UTF_8).length);
                buf.clear();
                // 存入数据到缓冲区
                buf.put(res.getBytes(StandardCharsets.UTF_8));
                buf.flip(); // 切换到读数据模式
                while(buf.hasRemaining()) {
                    mFileChannel.write(buf);
                }
                if (cnt < needRows) {
                    Entity jo = new Entity();
                    for (int i = 0; i < heads.size(); i++) {
                        if (i >= record.size()) {
                            jo.set(heads.get(i).getName(), null);
                        } else {
                            jo.set(heads.get(i).getName(), record.get(i));
                        }
                    }
                    data.add(jo);
                }
            }

            // 如果，第一行不是header， 应该作为数据添加，且不改变字段名
            if (fileUploadRequestDTO.getChunkNumber() == 0 && !needHeader) {
                Entity firstLine = new Entity();
                for (int i = 0; i < heads.size(); i++) {
                    firstLine.set(heads.get(i).getName(), headers.get(i));
                }
                data.add(0, firstLine);
            }
            ErrorInfo errorInfo = csvReader.geterrorLineInfo();
            if (fileUploadRequestDTO.getChunkNumber() == 0) {
                errorInfo.setSuccessCount(cnt);
                CsvParseInfo csvParseInfo = CsvParseInfo
                        .builder()
                        .errorInfo(errorInfo)
                        .path(temp.getAbsolutePath())
                        .build();
                if (fileUploadRequestDTO.getTotalChunks() == 1 && redisUtil.hasKey("csvResult_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName)) {
                    redisUtil.blpop("csvResult_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName);
                }
                redisUtil.lSet("csvResult_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName, csvParseInfo, expireTime);
                if (fileUploadRequestDTO.getTotalChunks() > 1) {
                    redisUtil.hset(FileConstant.FILE_CHUNK_SEG_INFO + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                            fileUploadRequestDTO.getChunkNumber() + "_end",
                            csvReader.getEndRow());
                }
            } else {
                // 该处阻塞设置
                Map<ErrorLineCode, ErrorLineCodeInfo> errorLineCodeInfoMap = errorInfo.getErrorLineCodeInfoMap();
                CsvParseInfo csvParseInfo = (CsvParseInfo) redisUtil.blpop("csvResult_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName);
                ErrorInfo partErrorInfo = csvParseInfo.getErrorInfo();
                int errorCount = partErrorInfo.getErrorCount() + errorInfo.getErrorCount();
                int successCnt = cnt + partErrorInfo.getSuccessCount();
                Map<ErrorLineCode, ErrorLineCodeInfo> partErrorLineCodeInfoMap =
                        partErrorInfo.getErrorLineCodeInfoMap();
                errorLineCodeInfoMap = Stream
                        .of(errorLineCodeInfoMap, partErrorLineCodeInfoMap)
                        .flatMap(x -> x.entrySet().stream())
                        .collect(
                                Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (v1, v2) ->
                                        new ErrorLineCodeInfo(Stream.of(v1.getLines(), v2.getLines())
                                                .flatMap(x -> x.stream()).collect(Collectors.toList()),
                                                v1.getCount() + v2.getCount())
                                ));
                errorInfo = ErrorInfo.builder()
                        .errorCount(errorCount)
                        .successCount(successCnt)
                        .errorLineCodeInfoMap(errorLineCodeInfoMap)
                        .build();
                csvParseInfo = CsvParseInfo
                        .builder()
                        .errorInfo(errorInfo)
                        .path(temp.getAbsolutePath())
                        .build();
                redisUtil.lSet("csvResult_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName, csvParseInfo, expireTime);
                if (fileUploadRequestDTO.getTotalChunks() > 1) {
                    redisUtil.hset(FileConstant.FILE_CHUNK_SEG_INFO + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                            fileUploadRequestDTO.getChunkNumber() + "_begin",
                            csvReader.getBeginRow());
                    redisUtil.hset(FileConstant.FILE_CHUNK_SEG_INFO + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                            fileUploadRequestDTO.getChunkNumber() + "_end",
                            csvReader.getEndRow());
                }
            }
            redisUtil.lSet(FileConstant.FILE_UPLOAD_PREVIEW_DATA + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName + "_"
                    + fileUploadRequestDTO.getChunkNumber(), data, expireTime);

            // 最后一块进行自动推荐数据类型
            if (fileUploadRequestDTO.getChunkNumber() == fileUploadRequestDTO.getTotalChunks() - 1) {

                // 汇总data
                List<Entity> FinalData = new LinkedList<>();
                for (int i = 0; i < fileUploadRequestDTO.getTotalChunks(); i++) {
                    FinalData.addAll((List<Entity>)  redisUtil.blpop(FileConstant.FILE_UPLOAD_PREVIEW_DATA + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName + "_"
                            + i));
                }
                if (fileUploadRequestDTO.getTotalChunks() > 1) {
                    // 拼接
                    String line = "";
                    for (int i = 0; i < fileUploadRequestDTO.getTotalChunks(); i++) {
                        if (i == 0) {
                            line += (String) redisUtil.hget(FileConstant.FILE_CHUNK_SEG_INFO + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                                    i + "_end");
                        } else if (i != fileUploadRequestDTO.getTotalChunks() - 1) {
                            line += (String) redisUtil.hget(FileConstant.FILE_CHUNK_SEG_INFO + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                                    i + "_begin");
                            line += (String) redisUtil.hget(FileConstant.FILE_CHUNK_SEG_INFO + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                                    i + "_end");
                        } else {
                            line += (String) redisUtil.hget(FileConstant.FILE_CHUNK_SEG_INFO + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                                    i + "_begin");
                        }
                    }
                    // 解析
                    String[] text = line.split("\n");
                    CsvReader lineCsvReader = new CsvReader(text, csvSemiotic, headers.size());
                    cnt = 0;
                    while (lineCsvReader.getState() != State.enddocument) {
                        List<String> record = lineCsvReader.readText();
                        if (record == null || record.size() == 0) continue;
                        // 写成功的句子
                        cnt++;
                        String res = "\r\n" + String.join(lineCsvReader.getSeparate(), record);
                        // 获取缓冲区
                        ByteBuffer buf = ByteBuffer.allocateDirect(res.getBytes(StandardCharsets.UTF_8).length);
                        buf.clear();
                        // 存入数据到缓冲区
                        buf.put(res.getBytes(StandardCharsets.UTF_8));
                        buf.flip(); // 切换到读数据模式
                        while(buf.hasRemaining()) {
                            mFileChannel.write(buf);
                        }
                    }
                    ErrorInfo lineErrorInfo = lineCsvReader.geterrorLineInfo();
                    Map<ErrorLineCode, ErrorLineCodeInfo> errorLineCodeInfoMap = lineErrorInfo.getErrorLineCodeInfoMap();
                    CsvParseInfo csvParseInfo = (CsvParseInfo) redisUtil.blpop("csvResult_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName);
                    ErrorInfo partErrorInfo = csvParseInfo.getErrorInfo();
                    int errorCount = partErrorInfo.getErrorCount() + lineErrorInfo.getErrorCount();
                    int successCnt = cnt + partErrorInfo.getSuccessCount();
                    Map<ErrorLineCode, ErrorLineCodeInfo> partErrorLineCodeInfoMap =
                            partErrorInfo.getErrorLineCodeInfoMap();
                    errorLineCodeInfoMap = Stream
                            .of(errorLineCodeInfoMap, partErrorLineCodeInfoMap)
                            .flatMap(x -> x.entrySet().stream())
                            .collect(
                                    Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (v1, v2) ->
                                            new ErrorLineCodeInfo(Stream.of(v1.getLines(), v2.getLines())
                                                    .flatMap(x -> x.stream()).collect(Collectors.toList()),
                                                    v1.getCount() + v2.getCount())
                                    ));
                    lineErrorInfo = ErrorInfo.builder()
                            .errorCount(errorCount)
                            .successCount(successCnt)
                            .errorLineCodeInfoMap(errorLineCodeInfoMap)
                            .build();
                    csvParseInfo = CsvParseInfo
                            .builder()
                            .errorInfo(lineErrorInfo)
                            .path(temp.getAbsolutePath())
                            .build();
                    redisUtil.lSet("csvResult_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName, csvParseInfo, expireTime);
                }
                data = DatasetUtil.recommendDataType(heads, FinalData, DatasetConstant.DATA_PREVIEW_SIZE);
            }
            // 全部写完
            mFileChannel.close();
            return PreviewDatasetVO.builder()
                .data(data)
                .head(heads)
                .headers(headers)
                .owner(JwtUtil.getCurrentUserDTO().getName()).build();
        } catch (IOException e) {
            logger.error("parsingCVSFile IOException: ", e);
            return null;
        }
    }


    public ErrorInfo parseErrorCSVFile(DatasetFileWriteVO vo) {
        // 去redis获取key为fileNameWithoutExt的值，即filename,如果没有那么是无法解析,或者解析没问题
        int count = 100;
        Object csvParseInfo = null;
        CsvSemiotic csvSemiotic = new CsvSemiotic(vo.getSeparate(),
                vo.getQuote(),
                vo.getEscape());
        String formatName = generateFileDifferentFormatName(vo.getCharSet(), csvSemiotic);
        csvParseInfo = redisUtil.blpop("csvResult_" + vo.getIdentifier() + "_" + formatName);
        redisUtil.lSet("csvResult_" + vo.getIdentifier() + "_" + formatName, csvParseInfo, expireTime);

        String serverCsvName = vo.getFilename() + "_check" + CharUtil.DOT + FileTypeEnum.CSV.getValue(); // 校验后结果不覆盖
        try {
            ChannelSftp channelSftp = (ChannelSftp) yamlSshSession.openChannel(CHANNEL_TYPE_SFTP);
            channelSftp.connect();
            FileInputStream fileInputStream = new FileInputStream(
                ((CsvParseInfo) csvParseInfo).getPath());
            channelSftp.put(fileInputStream, filepath + serverCsvName);
            channelSftp.disconnect();
            fileInputStream.close();
        } catch (JSchException | SftpException e) {
            logger.error("在GP服务器放入文件失败", e);
            return ((CsvParseInfo) csvParseInfo).getErrorInfo();
        } catch (FileNotFoundException e) {
            logger.error("parseErrorCSVFile FileNotFoundException: ", e);
            return ((CsvParseInfo) csvParseInfo).getErrorInfo();
        } catch (IOException e) {
            logger.error("parseErrorCSVFile IOException: ", e);
            return ((CsvParseInfo) csvParseInfo).getErrorInfo();
        }
        return ((CsvParseInfo) csvParseInfo).getErrorInfo();
    }

    public static String getFileNameWithExt(String name, String type) {
        name = FilenameUtils.getBaseName(name);
        return name + CharUtil.DOT + type;
    }


    public String generateFileName(String fileName) {
        String prefix = "";
        if (fileName.endsWith(".csv")) {
            prefix = "CSV-";
        } else if (fileName.endsWith(".xls") || fileName.endsWith(".xlsx")) {
            prefix = "EXCEL-";
        } else {
            prefix = "TEMP-";
        }
        return prefix + System.currentTimeMillis() + JwtUtil.getCurrentUserId()
            + RandomUtil.randomString(FILE_RANDOM_NAME_SIZE);
    }

    /***
     * 生成yml文件
     */
    public String generateGpConfig(DatasetFileWriteDTO params, String targetTable,
        Map<String, DateTimeFormatEnum> fieldFormat, String csvname, String delimiter,
        String charSet, CsvSemiotic csvSemiotic, boolean FirstImport) {
        logger.info(
            "进入generateGpConfig()方法 参数为空：" + params.getFilename() + "==" + params.getImportType());
        StringBuilder content = new StringBuilder();

        content.append("VERSION: ").append(gpLoadVersion).append("\r\n");
        content.append("DATABASE: ").append(gpLoadDatabase).append("\r\n");
        content.append("USER: ").append(gpUsername).append("\r\n");
        content.append("HOST: ").append(gpMasterHostName).append("\r\n");
        content.append("PORT: ").append(gpPort).append("\r\n");
        content.append("GPLOAD: ").append("\r\n");
        content.append("  INPUT: ").append("\r\n");
        content.append("    - SOURCE: ").append("\r\n");
        content.append("        LOCAL_HOSTNAME: ").append("\r\n");
        content.append("          - ").append(gpLoadLocalHostName).append("\r\n");
        content.append("        PORT_RANGE: ").append(gpLoadPortRange).append("\r\n");
        content.append("        FILE: ").append("\r\n");
        content.append("          - " + filepath + csvname).append("\r\n");
        content.append("    - COLUMNS: ").append("\r\n");
        params.getData().forEach(u -> {
            logger.info("字段名称：" + u.getName() + "字段类型：" + u.getType());
            //字段加上单引号，防止字段中有no等与python相关的特殊字符
            String string =
                "          - '" + SqlUtil.formatFieldNameWithSpace(u.getName().toLowerCase())
                    + "': ";
            content.append(string).append(DataTypeEnum.get(u.getType()).getGpType()).append("\r\n");
        });
        content.append("    - FORMAT: ").append("csv").append("\r\n");
        content.append("    - DELIMITER: '").append(csvSemiotic.getSeparate() == Separate.NULL ? csvSemiotic.getOtherSeparate():csvSemiotic.getSeparate().getSymbol()).append("'\r\n");
        content.append("    - ESCAPE: '").append(csvSemiotic.getEscape() == Escape.NULL.NULL ? csvSemiotic.getOtherEscape():csvSemiotic.getEscape().getSymbol()).append("'\r\n");
        content.append("    - QUOTE: '").append(csvSemiotic.getQuote() == Quote.NULL ? csvSemiotic.getOtherQuote():csvSemiotic.getQuote().getSymbol()).append("'\r\n");
        content.append("    - MAX_LINE_LENGTH: ").append("1048576").append("\r\n");
        content.append("    - HEADER: ").append(getNeedHeaderFlag(params.getFirstLineAsFields()))
            .append("\r\n");
        if (!FirstImport || "utf-8".equals(charSet) || "Unicode".equals(charSet)) {
            content.append("    - ENCODING: ").append("UTF8").append("\r\n");
        } else {
            content.append("    - ENCODING: ").append("GBK").append("\r\n");
        }
        content.append("    - ERROR_LIMIT: ").append(25).append("\r\n");
        content.append("  OUTPUT: ").append("\r\n");
        content.append("    - TABLE: ").append("dataset.\"" + targetTable).append("\"\r\n");
        content.append("    - MODE: ").append("INSERT").append("\r\n");
        content.append("    - MAPPING: ").append("\r\n");
        params.getData().forEach(u -> {
            String mapping = getMapping(u, fieldFormat);
            content.append("       ").append("'").append(SqlUtil.formatFieldName(u.getName()))
                .append("'").append(": ")
                .append(SqlUtil.formatFieldNameWithSpace(mapping)).append("\r\n");
        });
        return content.toString();
    }

    /**
     * mapping 对应yaml前面的columns中的字段名
     */
    private static String getMapping(DatasetColumnDTO fieldType,
        Map<String, DateTimeFormatEnum> fieldFormat) {
        StringBuilder mapping = new StringBuilder();
        if (fieldFormat.containsKey(fieldType.getName())) {
            mapping.append("to_timestamp(").append(wrapNameInFunction(fieldType.getName()))
                .append(", ").append("'")
                .append(fieldFormat.get(fieldType.getName()).getGpPattern()).append("')");
        } else if (DataTypeEnum.INT.getValue().equals(fieldType.getType())) {
            mapping.append(castToInt(fieldType.getName()));
        } else {
            mapping.append(wrapYmlFieldName(fieldType.getName()));
        }
        return mapping.toString();
    }

    public String getGpType(String dataType) {
        if (DataTypeEnum.DATE.getValue().equals(dataType) || DataTypeEnum.JSON.getValue()
            .equals(dataType) || DataTypeEnum.ARRAY.getValue().equals(dataType)) {
            return DataTypeEnum.VARCHAR.getValue();
        }
        return dataType;
    }

    private static String castToInt(String name) {
        return CharUtil.SINGLE_QUOTE + wrapNameInFunction(name) + "::numeric::bigint"
            + CharUtil.SINGLE_QUOTE;
    }

    private static String wrapNameInFunction(String name) {
        return CharUtil.DOUBLE_QUOTES + name.toLowerCase() + CharUtil.DOUBLE_QUOTES;
    }

    private static String wrapYmlFieldName(String name) {
        return CharUtil.SINGLE_QUOTE + wrapNameInFunction(name) + CharUtil.SINGLE_QUOTE;
    }

    private String getNeedHeaderFlag(Boolean flag) {
        return flag == null ? Boolean.TRUE.toString() : flag.toString();
    }

    public Map<String, DateTimeFormatEnum> preValidateFields(DatasetFileWriteDTO dto,
        CsvSemiotic csvSemiotic, String charSet) {
        ChannelExec channelExec;
        // k字段名, v转换匹配的格式.用于存日期类型数据匹配到的format
        Map<String, DateTimeFormatEnum> filedFormat = new HashMap<>();

        try {
            channelExec = (ChannelExec) yamlSshSession.openChannel(CHANNEL_TYPE_EXEC);
            if (channelExec == null) {
                throw new DataScienceException(
                    BaseErrorCode.DATASET_IMPORT_CONNECT_GP_SERVER_ERROR);
            }
        } catch (JSchException e) {
            logger.warn("pre validate connect gp server error", e);
            return filedFormat;
        }

        try (InputStream in = channelExec.getInputStream()) {
            channelExec.setErrStream(System.err);
            // 查看文件前PRE_VALIDATE_DATA_SIZE行
            channelExec.setCommand(StrUtil.format("head -{} {}",
                PRE_VALIDATE_DATA_SIZE,
                filepath + getFileNameWithExt(dto.getFilename(), dto.getImportType())));
            channelExec.connect();
            List<String> record = null;
            int cnt = 0;

            InputStreamReader isr;
            if ("utf-8".equals(charSet) || "Unicode".equals(charSet)) {
                isr = new InputStreamReader(in, "UTF-8");
            } else {
                isr = new InputStreamReader(in, "GB2312");
            }
            BufferedReader reader = new BufferedReader(isr);
            CsvReader csvReader = new CsvReader(reader, dto.getFirstLineAsFields(), csvSemiotic);

            // 处理数据部分
            while (csvReader.getState() != State.enddocument) {
                while (csvReader.getState() != State.enddocument
                    && (record = csvReader.readNext(false)) == null) {
                    record = csvReader.readNext(false);
                }
                if (record == null) {
                    break;
                }
                doValidate(record, dto.getData(), filedFormat);
                cnt++;
            }

        } catch (IOException | JSchException e) {
            logger.warn("pre validate connect gp server error", e);
        }
        return filedFormat;
    }

    public void doValidate(List<String> R, List<DatasetColumnDTO> fields,
        Map<String, DateTimeFormatEnum> filedFormat) {
        String[] record = R.toArray(new String[R.size()]);
        Preconditions.checkLessAndEqual(record.length, fields.size(),
            StrUtil.format("csv文件中的数据字段数量超过表的字段数，请检查。data:{} ；data size:{} ； table size:{}",
                Arrays.toString(record), record.length, fields.size()));

        DateTimeFormatEnum matchFormat;

        StringBuilder errorData = new StringBuilder();
        for (int i = 0; i < record.length; i++) {
            try {
                matchFormat = DataTypeValidator
                    .validateAndMatch(record[i], fields.get(i).getType());
            } catch (DataScienceException e) {
                if (errorData.length() > 0) {
                    errorData.append(Constant.NEW_LINE_CHARACTER);
                }
                errorData.append(StrUtil.format("字段：{} 的值： {} 类型转换错误，无法转换为 {} 类型",
                    fields.get(i).getName(), record[i], fields.get(i).getType()));
                matchFormat = null;
            }
            updateFiledFormat(filedFormat, fields.get(i).getName(), matchFormat);
        }
        if (errorData.length() > 0) {
            throw DataScienceException
                .of(BaseErrorCode.DATASET_IMPORT_DATA_TRANSFORM_FAIL, null, errorData.toString());
        }
    }

    private void updateFiledFormat(Map<String, DateTimeFormatEnum> filedFormat, String field,
        DateTimeFormatEnum matchFormat) {
        if (matchFormat == null) {
            return;
        }
        if (!filedFormat.containsKey(field)) {
            filedFormat.put(field, matchFormat);
            return;
        }
        if (matchFormat.getPriority() > filedFormat.get(field).getPriority()) {
            filedFormat.put(field, matchFormat);
        }
    }

    /***
     * 连接数据库 创建表
     */
    public void createTable(String targetTable, List<DatasetColumnDTO> data) {
        logger.info("进入createTable()方法 参数为空：" + data);
        //获取字段名称和字段类型
        String sql = "CREATE TABLE IF NOT EXISTS dataset.\"" + targetTable + "\"(";
        StringBuilder sb = new StringBuilder();
        data.forEach(u -> {
            sb.append("\"").append(SqlUtil.formatFieldName(u.getName())).append("\"")
                .append(" ").append(DataTypeEnum.get(u.getType()).getGpType());
            sb.append(",");
        });
        sb.append(JDBCUtil.GP_WRAPPER.wrap(DatasetConstant.DEFAULT_ID_FIELD))
            .append(" serial PRIMARY KEY )");
        String s = sb.toString();
        try (Connection connection = DriverManager.getConnection(gpUrl, gpUsername, gpPassword)) {
            logger.info("是否成功连接pg数据库" + connection);
            Statement statement = connection.createStatement();
            sql += s;
            logger.info("execute: " + sql + "=====");
            statement.execute(sql);
        } catch (SQLException e) {
            logger.error("createTable SQLException: {}", e.getMessage());
            throw DataScienceException
                    .of(BaseErrorCode.DATASET_IMPORT_CSV_CREATETABLE_ERROR, null, "某个列字段名称为数据库关键字，请修改文件后重新上传");
        }
    }

    /***
     * 连接数据库 删除表
     */
    public void dropTable(String targetTable) {
        //获取字段名称和字段类型
        String sql = "DROP TABLE dataset.\"" + targetTable + "\"";
        try (Connection connection = DriverManager.getConnection(gpUrl, gpUsername, gpPassword)) {
            logger.info("是否成功连接pg数据库" + connection);
            Statement statement = connection.createStatement();
            logger.info("execute: " + sql + "=====");
            statement.execute(sql);
        } catch (SQLException e) {
            logger.error(e.getMessage(), e);
            throw new DataScienceException("删除表异常", e);
        }
    }

    /**
     * 给特定表格的特定字段添加索引 _record_id_
     *
     * @param targetTable 表名 CREATE INDEX "record" ON "dataset"."i_58_1622453272631" USING btree
     *                    ("_record_id_");
     */
    private void createIndex(String targetTable) {
        String indexName = targetTable + "_record";
        String sql = "CREATE INDEX " + indexName + " ON dataset.\"" + targetTable +
            "\" USING btree (\"" + DatasetConstant.DEFAULT_ID_FIELD + "\")";
        try (Connection connection = DriverManager.getConnection(gpUrl, gpUsername, gpPassword)) {
            Statement statement = connection.createStatement();
            statement.execute(sql);
        } catch (SQLException e) {
            logger.error("创建索引异常", e);
        }
    }

    public DatasetImportResDTO writeToGP(DatasetFilesWriteVO vo, String charSet) {
        CsvSemiotic csvSemiotic = new CsvSemiotic(vo.getSeparate(), vo.getQuote(), vo.getEscape());
        String formatName = generateFileDifferentFormatName(charSet, csvSemiotic);
        Long id = JwtUtil.getCurrentUserDTO().getId();
        DatasetImportResDTO res = DatasetImportResDTO.builder().successed(true).build();
        String categoryId = vo.getCategoryId();
        String importType = vo.getImportType();
        String datasetName = vo.getName();
        String separator = DatasetConstant.DEFAULT_DATA_SEPARATOR;
        String realImportType = importType.toLowerCase();
        List<DatasetFileWriteFileVO> files = vo.getFiles();
        if ("excel".equalsIgnoreCase(importType)) {
            // 转成csv后缀都是csv
            vo.setFirstImport(false);
            csvSemiotic = new CsvSemiotic(",", "\"", "\"");
            importType = "csv";
            try {
                if (!checkEnd(files)) {
                    return DatasetImportResDTO.builder().datasetId(-1L).build();
                }
            } catch (QueryTimeoutException e) {
                logger.error("writeToGP.check End error, redis timeout: {}", e.getMessage());
                return DatasetImportResDTO.builder().datasetId(-2L).build();
            }
        }
        String delimiter = csvSemiotic.getSeparate() == Separate.NULL ? csvSemiotic.getOtherSeparate():csvSemiotic.getSeparate().getSymbol();
        int i = 0;
        for (DatasetFileWriteFileVO file : files) {
            /* 兼容原版 */
            DatasetFileWriteDTO dto = new DatasetFileWriteDTO();
            dto.setCategoryId(categoryId);
            dto.setImportType(importType);
            dto.setFilename(file.getFilename());
            // 第一行是否是字段名
            dto.setFirstLineAsFields(file.getFirstLineAsFields());
            // 字段的信息
            List<DatasetColumnDTO> colTypes = file.getData();
            for (DatasetColumnDTO typeDto : colTypes) {
                typeDto.setName(SqlUtil.formatFieldName(typeDto.getName()));//format column name
            }

            dto.setData(file.getData());
            if ("excel".equalsIgnoreCase(realImportType)) {
                dto.setName(datasetName + "_" + file.getSheetName());
            } else {
//                if (!vo.getFirstImport()) redisUtil.del("csvResult_" + file.getIdentifier());
                dto.setName(datasetName);
            }

            //文件名去空格
            String newFileName = dto.getFilename().replaceAll("\\s*", "");
            dto.setFilename(newFileName);
            //验证字段类型
            Map<String, DateTimeFormatEnum> fieldFormat = preValidateFields(dto, csvSemiotic,
                charSet);
            /* 根据验证结果，date转为time（入参没有time格式，只有date格式） */
            if (fieldFormat != null || !fieldFormat.isEmpty()) {
                for (DatasetColumnDTO typeDto : colTypes) {
                    if (fieldFormat.containsKey(typeDto.getName())) {
                        DateTimeFormatEnum df = fieldFormat.get(typeDto.getName());
                        if (DateTimeFormatTypeEnum.TIME == df.getType()) {
                            typeDto.setType("time");
                            //移除，防止后面生成yml文件的mapping时，time格式使用to_timestamp函数报错
                            fieldFormat.remove(typeDto.getName());
                        }
                    }
                }
            }

            //生成gp目标表名
            String targetTable = ImportDataService.generateGpTableName();
            //根据importType获取带后缀文件名
            String serverCsvName = getFileNameWithExt(dto.getFilename(), dto.getImportType());
            //生成配置信息
            String generateGpConfig = generateGpConfig(dto, targetTable, fieldFormat, serverCsvName,
                delimiter, charSet, csvSemiotic, vo.getFirstImport());
            //建表
            createTable(targetTable, colTypes);

            //生成数据脱敏和控制处理相关sql,暂不进行空值补全，后面加上选项让用户自主选择
            Map<String, String> map = generateMaskingAndDropTableSql(targetTable, colTypes, false);
            targetTable = map.get(DatabaseConstant.MAP_TABLE_NAME_KEY);
            String maskingSql = map.get(DatabaseConstant.MAP_MASKING_SQL_KEY);
            String dropSql = map.get(DatabaseConstant.MAP_DROP_TABLE_SQL_KEY);

            //上传yuml配置文件并执行gpload命令以及数据二次处理
            uploadYmlAndExecGpLoadAndMasking(res, generateGpConfig, serverCsvName, maskingSql,
                dropSql, targetTable, vo.getFirstImport(), formatName, file.getIdentifier());

            DatasetJsonInfo dj = DatasetJsonInfo.builder()
                .schema(DatabaseConstant.GREEN_PLUM_DEFAULT_SCHEMA)
                .table(targetTable)
                .type(realImportType)
                .columnMessage(file.getData())
                .build();

            /* 保存关联信息到数据集 */
            if (i == 0) {
                //第一个数据集id作为返回值
                Long did = datasetService
                    .saveDataset(id, Long.valueOf(dto.getCategoryId()), dto.getName(), dj);
                res.setDatasetId(did);
            } else {
                datasetService
                    .saveDataset(id, Long.valueOf(dto.getCategoryId()), dto.getName(), dj);
            }
            i++;
        }

        return res;
    }

    /**
     * 生成脱敏、数字字符串空数据处理和删表sql
     *
     * @param targetTable
     * @param data
     * @return
     */
    public Map<String, String> generateMaskingAndDropTableSql(String targetTable,
        List<DatasetColumnDTO> data, boolean nullCompletion) {
        logger.info("generateMaskingSql()方法 参数为空：" + data);

        Map<String, String> map = new HashMap<>();
        String newTable = targetTable + "_m";
        String dbName = DatabaseConstant.GREEN_PLUM_DEFAULT_SCHEMA;
        String sql = "create table " + dbName + "." + JDBCUtil.GP_WRAPPER.wrap(newTable) + " as ";
        StringBuilder sb = new StringBuilder();
        sb.append("select ");
        //为true时表示需要处理
        boolean flag = false;
        for (DatasetColumnDTO dt : data) {
            //非date格式字段，空值处理
            if (!"date".equals(dt.getType()) && nullCompletion) {
                flag = true;
            }
            String mt = dt.getDataMaskingType();
            String col = JDBCUtil.GP_WRAPPER.wrap(SqlUtil.formatFieldName(dt.getName()));

            if (dt.getImportColumn() != null && !dt.getImportColumn()) {
                flag = true;
                //不导入该字段
                continue;
            }
            if (StringUtils.isNotBlank(mt)) {
                mt = mt.toLowerCase();
                switch (mt) {
                    case "md5":
                    case "sha1":
                        //hmac加密并去除结果16进制的\x前缀
                        sb.append("case when ").append(col).append(" is null then '' else ")
                            .append(String.format(DatabaseConstant.HMAC_WITHOUT_HEX_PREFIX_SQL, col,
                                DatabaseConstant.SECRET_KEY, mt))
                            .append(" end ").append(col).append(",");
                        flag = true;
                        break;
                    case "mosaic":
                        //字符串打码
                        sb.append(String
                            .format(DatabaseConstant.MOSAIC_SQL, col, col, col, col, col, col, col,
                                col, col, col, col, col, col));
                        flag = true;
                        break;
                    default:
                        if (("int".equals(dt.getType()) || "decimal".equals(dt.getType()))
                            && nullCompletion) {
                            //数值类型为null时，处理成0
                            sb.append(String.format(DatabaseConstant.COALESCE_ZERO_SQL, col, col));
                        } else if (("varchar".equals(dt.getType()) || "text".equals(dt.getType()))
                            && nullCompletion) {
                            //字符串及文本类型为null时，处理成空字符串
                            sb.append(
                                String.format(DatabaseConstant.COALESCE_EMPTY_STR_SQL, col, col));
                        } else {
                            sb.append(col).append(",");
                        }
                        break;
                }

            } else {
                if (("int".equals(dt.getType()) || "decimal".equals(dt.getType()))
                    && nullCompletion) {
                    sb.append(String.format(DatabaseConstant.COALESCE_ZERO_SQL, col, col));
                } else if (("varchar".equals(dt.getType()) || "text".equals(dt.getType()))
                    && nullCompletion) {
                    sb.append(String.format(DatabaseConstant.COALESCE_EMPTY_STR_SQL, col, col));
                } else {
                    sb.append(col).append(",");
                }
            }
        }
        if (!flag) {
            //不需要处理，直接返回原表名
            map.put(DatabaseConstant.MAP_TABLE_NAME_KEY, targetTable);
            map.put(DatabaseConstant.MAP_MASKING_SQL_KEY, "");
            map.put(DatabaseConstant.MAP_DROP_TABLE_SQL_KEY, "");
            return map;
        }
        sb.append(JDBCUtil.GP_WRAPPER.wrap(DatasetConstant.DEFAULT_ID_FIELD))
            .append(" from ").append(dbName).append(".")
            .append(JDBCUtil.GP_WRAPPER.wrap(targetTable))
            .append(";");
        sql += sb.toString();

        //生成删表sql
        String dropSql =
            "drop table if exists " + dbName + "." + JDBCUtil.GP_WRAPPER.wrap(targetTable) + ";";

        map.put(DatabaseConstant.MAP_TABLE_NAME_KEY, newTable);
        map.put(DatabaseConstant.MAP_MASKING_SQL_KEY, sql);
        map.put(DatabaseConstant.MAP_DROP_TABLE_SQL_KEY, dropSql);

        logger.info("masking sql：" + sql);
        logger.info("drop sql：" + dropSql);

        return map;
    }

    /**
     * 检查字段中是否有空格或特殊字符
     *
     * @param heads
     * @return
     */
    public boolean checkHead(List<HeadVO> heads) {
        for (HeadVO head : heads) {
            if (head.getName().trim().matches(DatasetConstant.SPECIAL_CHARACTER_REGEX)) {
                return true;
            }
        }
        return false;
    }

    /**
     * 调用gp copy命令将文件拷贝到服务器本地
     *
     * @return
     */
    public boolean copyGpTableToLocal(String tableName, String filepath) {
        ChannelExec channelExec = null;
        try {
            channelExec = (ChannelExec) yamlSshSession.openChannel(CHANNEL_TYPE_EXEC);
            String command = String.format("copy %s to '%s' with csv header", tableName, filepath);
//            channelExec.setPty(true);
            channelExec.setCommand(command);
            channelExec.setInputStream(null);
            channelExec.setOutputStream(System.err);
            channelExec.connect();
        } catch (JSchException e) {
            e.printStackTrace();
            return false;
        } finally {
            if (channelExec != null) {
                channelExec.disconnect();
            }
        }
        return true;
    }

    /**
     * sftp下载文件
     *
     * @param remoteFilePath
     * @param localFilePath
     */
    public boolean downloadFile(String remoteFilePath, String localFilePath) {
        ChannelSftp channelSftp = null;
        try {
            channelSftp = (ChannelSftp) yamlSshSession.openChannel(CHANNEL_TYPE_SFTP);
            channelSftp.connect();
            channelSftp.get(remoteFilePath, localFilePath);
        } catch (JSchException e) {
            e.printStackTrace();
            return false;
        } catch (SftpException e) {
            e.printStackTrace();
            return false;
        } finally {
            if (channelSftp != null) {
                channelSftp.disconnect();
            }
        }
        return true;
    }

    /**
     * 上传文件
     *
     * @param fileUploadRequestDTO
     * @return
     * @throws Exception
     */
    public PreviewDatasetListVO uploadFiles(FileUploadRequestDTO fileUploadRequestDTO,
        String fileNameExt, String charSet, CsvSemiotic csvSemiotic, String formatName) throws IOException {
        boolean needHeader = Boolean.parseBoolean(fileUploadRequestDTO.getFirstLineAsFields());
        //获取预览数据
        List<PreviewDatasetVO> previewData = new ArrayList<>();
        String filename = Objects.requireNonNull(fileUploadRequestDTO.getFilename().toLowerCase());
        StringBuilder tips = new StringBuilder();
        PreviewDatasetListVO previewDatasetList = new PreviewDatasetListVO();
        if (filename.endsWith(".csv")) {
            /* 上传csv文件 */
            PreviewDatasetVO parsingCVSFile;
            if (fileUploadRequestDTO.getTotalChunks() > 1) {
                if (fileUploadRequestDTO.getChunkNumber() != 0) {
                    // 获取放在gp服务器的文件名
                    fileNameExt = (String) redisUtil.get(FileConstant.FILE_UPLOAD_FILENAME + "_" + fileUploadRequestDTO.getIdentifier());
                } else {
                    if (redisUtil.setnx(FileConstant.FILE_UPLOAD_FILENAME + "_" + fileUploadRequestDTO.getIdentifier(), fileNameExt, expireTime)) {
                        fileNameExt = (String) redisUtil.get(FileConstant.FILE_UPLOAD_FILENAME + "_" + fileUploadRequestDTO.getIdentifier());
                    }
                }
            }
            //浏览文件
            parsingCVSFile = parsingCVSFile(fileUploadRequestDTO, charSet, csvSemiotic, fileNameExt, formatName);
            parsingCVSFile.setName(filename.substring(0, filename.lastIndexOf(".")));

            try {
                // 传至gp所在服务器
                uploadFile(fileUploadRequestDTO.getFile(), fileNameExt + "_" + fileUploadRequestDTO.getChunkNumber(), charSet);
            } catch (Exception e) {
                logger.error("uploadFilesError: ", e);
            }
            parsingCVSFile.setFileName(fileNameExt);
            previewData.add(parsingCVSFile);
            if (fileUploadRequestDTO.getChunkNumber() == fileUploadRequestDTO.getTotalChunks() - 1) {
                try {
                    // 合并所有分块文件
                    ChannelExec channelExec = (ChannelExec) yamlSshSession.openChannel(CHANNEL_TYPE_EXEC);
                    if (channelExec == null) {
                        throw new DataScienceException(
                                BaseErrorCode.DATASET_IMPORT_CONNECT_GP_SERVER_ERROR);
                    }
                    channelExec.setCommand(StrUtil.format("for ((i=0;i<{};i++))do echo {}$i.csv;done | xargs -i cat \\{} > {}", fileUploadRequestDTO.getTotalChunks(),
                            filepath + fileNameExt + "_",
                            filepath + fileNameExt + ".csv"));
                    channelExec.connect();
                } catch (JSchException e) {
                    logger.error("合并文件失败", e);
                    throw new DataScienceException(
                            BaseErrorCode.DATASET_IMPORT_CONNECT_GP_SERVER_CAT_ERROR);
                }
                if (!formatName.substring(formatName.length()-3).contains("0")) redisUtil.lSet(FileConstant.FILE_UPLOAD_PREVIEW_DATA + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName, previewData, expireTime);
                else {
                    redisUtil.blpop(FileConstant.FILE_HEADER + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName);
                }
            }
        } else {
            boolean isLargeFile = fileUploadRequestDTO.getFile().getSize() > FileUtil.strSize2Long(bigFileThreshold);
            Workbook wk = FileImportUtil.readExcel(fileUploadRequestDTO.getFile());
            if (wk.getNumberOfSheets() > 10) {
                throw DataScienceException
                    .of(BaseErrorCode.UPLOAD_EXCEL_SHEETS_TOO_MANY_ERROR,
                        "please split your file.");
            }
            previewData = FileImportUtil
                .readForPreview(wk, needHeader, fileNameExt, isLargeFile);
        }
        if (!previewData.isEmpty()) {
            for (PreviewDatasetVO pv : previewData) {
                if (checkHead(pv.getHead())) {
                    tips.append(DatasetConstant.TIPS_SPECIAL_CHARACTER);
                }
            }
        }
        previewDatasetList.setTips(tips.toString());
        previewDatasetList.setPreviewDatasetVOS(previewData);

        return previewDatasetList;
    }

    /**
     * 导入excel到GP
     *
     * @param file
     */
    public void importExcelToGP(MultipartFile file, boolean containsHeader, String fileNameExt) {
        Workbook wk = null;
        try {
            wk = FileImportUtil.readExcel(file);
            for (int i = 0; i < wk.getNumberOfSheets(); i++) {
                Sheet sheet = wk.getSheetAt(i);
                String fileName =
                    fileNameExt + CharUtil.UNDERLINE + sheet.getSheetName().replaceAll("\\s*", "");
                if (readSheet(sheet, fileName, containsHeader, tempfilepath) == 0) {
                    continue;
                }
                importCsvToGP(fileName);
//                redisUtil.lrPush(fileNameExt, fileName);
            }
            redisUtil.lrPush(fileNameExt, "end");
        } catch (IOException e) {
            logger.error("importExcelToGp:", e);
            redisUtil.lrPush(fileNameExt, "error");
        } finally {
            redisUtil.expire(fileNameExt, 60 * 60);
            close(wk);
        }
    }

    /**
     * 上传cvs文件到GP
     *
     * @param fileName 临时文件名
     */
    public void importCsvToGP(String fileName) {
        String fileNameWithExt = fileName + CharUtil.DOT + FileTypeEnum.CSV.getValue();
        InputStream inputStream = null;
        try {
            inputStream = new FileInputStream(tempfilepath + fileNameWithExt);
            ChannelSftp channelSftp = (ChannelSftp) yamlSshSession.openChannel(CHANNEL_TYPE_SFTP);
            channelSftp.connect();
            channelSftp.put(inputStream, filepath + fileNameWithExt);
            channelSftp.disconnect();
        } catch (JSchException | SftpException e) {
            throw new DataScienceException("文件上传GP服务器失败", e);
        } catch (FileNotFoundException e) {
            throw new DataScienceException("文件上传失败，请重新上传文件", e);
        } finally {
            close(inputStream);
        }
    }

    /**
     * 等待数据上传到GP完毕
     *
     * @param files
     * @return
     */
    public boolean checkEnd(List<DatasetFileWriteFileVO> files) throws QueryTimeoutException {
        String redisResult;
        String name;
        DatasetFileWriteFileVO file = files.get(0);
        name = file.getFilename().split("_")[0];
        redisResult = (String) redisUtil.blpop(name);
        if (redisResult == null) {
            return false;
        }
        if (redisResult.equals("error") || redisResult.equals("end")) {
            redisUtil.lrPush(name, redisResult);
            redisUtil.expire(name, 60 * 60);
        }
        return !redisResult.equals("error");
    }

    public DatasetAndHeadVO generateMask(DatasetColInfoVO vo) {
        DatasetAndHeadVO result = new DatasetAndHeadVO();
        List<HeadVO> head = new ArrayList<>();
        for (DatasetColumnDTO dt : vo.getBaseTableConfigVO().getData()) {
            if (!dt.getImportColumn()) {
                continue;
            }
            head.add(HeadVO.builder()
                .name(dt.getName())
                .type(dt.getType())
                .recommendType(dt.getType())
                .semantic(dt.getSemantic())
                .recommendSemantic(dt.getSemantic())
                .build());
        }
        result.setHead(head);
        List<Entity> data = new LinkedList<>();
        for (JSONObject dj : vo.getColData()) {
            Entity jo = new Entity();
            for (DatasetColumnDTO dt : vo.getBaseTableConfigVO().getData()) {
                if (!dt.getImportColumn()) {
                    continue;
                }
                String mt = dt.getDataMaskingType();
                String name = dt.getName();
                String value = (String) dj.get(name);
                String newValue = null;
                if (StringUtils.isNotBlank(mt)) {
                    mt = mt.toLowerCase();
                    switch (mt) {
                        case "md5":
                            try {
                                newValue = CryptoUtil.hmacMD5(value);
                            } catch (Exception e) {
                                logger.error("md5 desensitization error", e);
                            }
                            break;
                        case "sha1":
                            //hmac加密并去除结果16进制的\x前缀
                            try {
                                newValue = CryptoUtil.hmacSHA1(value);
                            } catch (Exception e) {
                                logger.error("sha1 desensitization error", e);
                            }
                            break;
                        case "mosaic":
                            //字符串打码
                            try {
                                newValue = CryptoUtil.stringMosaic(value);
                            } catch (Exception e) {
                                logger.error("mosaic desensitization error", e);
                            }
                            break;
                        default:
                            break;
                    }
                }
                if (newValue != null) {
                    jo.set(name, newValue);
                } else {
                    jo.set(name, value);
                }
            }
            data.add(jo);
        }
        result.setData(data);
        return result;
    }

    public FileUploadDTO checkFileMd5(FileUploadRequestDTO fileUploadRequestDTO) {
        CsvSemiotic csvSemiotic = new CsvSemiotic(fileUploadRequestDTO.getSeparate(),
                fileUploadRequestDTO.getQuote(),
                fileUploadRequestDTO.getEscape());
        String formatName = generateFileDifferentFormatName(fileUploadRequestDTO.getCharSet(), csvSemiotic);
        Object isComplete = null;
        if (!formatName.substring(formatName.length()-3).contains("0")) isComplete = redisUtil.hget(FileConstant.FILE_UPLOAD_COMPLETE, fileUploadRequestDTO.getIdentifier() + "_" + formatName);
        else {
            // 对于非默认字符，每次重新上传，返回前端告诉未上传并进行分块upload
            // TODO(hjh) : 高级设置的自定义部分上锁很麻烦，暂时不考虑
//            double nowId = redisUtil.hincr(FileConstant.FILE_IS_ING, fileUploadRequestDTO.getIdentifier() + "_" + formatName + "_" + fileUploadRequestDTO.getChunkNumber(), 1);
//            if (formatName.substring(formatName.length()-3).contains("0")) nowId = 1; // 对于自定义的编码，不进行上锁，各自只传一次
//            if (nowId != 1) {
//                // 控制多用户时候，由于重新上传，但是定义的名字仍然相同，需要等待上一个结束，因此提醒用户稍后
//                // 还有就是自己重复来了，那么也是一样的，重新进入，等待上一个结束
//                // 由于多加了一次，简单的减
//                redisUtil.hdecr(FileConstant.FILE_IS_ING, fileUploadRequestDTO.getIdentifier() + "_" + formatName + "_" + fileUploadRequestDTO.getChunkNumber(), 1)
//
//            } else {
//
//            }
            return FileUploadDTO.builder().code(FileCheckMd5Status.FILE_NO_UPLOAD.getValue()).build();
        }
        // 生成临时文件，用于写入文件
        File temp = new File(sliceFolderPath + File.separatorChar + fileUploadRequestDTO.getIdentifier()
                + File.separator + formatName,
                "Csv_temp_" + fileUploadRequestDTO.getIdentifier() + ".csv");
        boolean needHeader = Boolean.parseBoolean(fileUploadRequestDTO.getFirstLineAsFields());
        if (isComplete == null) {
            // 要看看MD5是否存在
            // 配置文件
            File originConfFile = new File(sliceFolderPath + File.separator + fileUploadRequestDTO.getIdentifier(),
                    fileUploadRequestDTO.getIdentifier() + ".conf");
            // 当前编码的conf
            File confFile = new File(sliceFolderPath + File.separatorChar + fileUploadRequestDTO.getIdentifier() + File.separator + formatName,
                    fileUploadRequestDTO.getIdentifier() + ".conf");
            // 针对redis宕机情况
            if (temp.exists() && !redisUtil.hasKey(FileConstant.FILE_HEADER + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName)) {
                temp.delete();
                if (confFile.exists()) {
                    confFile.delete();
                }
            }
            if (originConfFile.exists()) {
                if (!redisUtil.hasKey(FileConstant.FILE_UPLOAD_FILENAME + "_" + fileUploadRequestDTO.getIdentifier())) {
                    File origin = new File(sliceFolderPath + File.separator + fileUploadRequestDTO.getIdentifier());
                    FileUtils.deleteQuietly(origin);
                    return FileUploadDTO.builder().code(FileCheckMd5Status.FILE_NO_UPLOAD.getValue()).build();
                }
                // 写过conf, 查看写到哪
                // 比较每个块，对已经上传的块执行分块解析，然后返回没有上传块的块号
                try {
                    byte[] completeList = FileUtils.readFileToByteArray(originConfFile);
                    List<Integer> missChunkList = new LinkedList<>();
                    // 先对首快解析
                    FileChannel mFileChannel = new FileOutputStream(temp, true).getChannel();
                    File theFile;
                    List<String> headers;
                    double nowId = redisUtil.hincr(FileConstant.FILE_IS_ING, fileUploadRequestDTO.getIdentifier() + "_" + formatName + "_" + -1, 1);
                    if (nowId == 1) {
                        theFile = new File(sliceFolderPath + File.separatorChar + fileUploadRequestDTO.getIdentifier(), FileConstant.FILE_ORIGIN + "_" + 0 + ".csv");
                        InputStreamReader isr;
                        BufferedReader br;
                        String charSet = fileUploadRequestDTO.getCharSet();
                        if ("utf-8".equals(charSet) || "Unicode".equals(charSet)) {
                            isr = new InputStreamReader(new FileInputStream(theFile), "UTF-8");
                        } else {
                            isr = new InputStreamReader(new FileInputStream(theFile), "GB2312");
                        }
                        br = new BufferedReader(isr);
                        CsvReader csvReader = new CsvReader(br, true, csvSemiotic);
                        headers = csvReader.getHeaders(true);
                        String res = String.join(csvReader.getSeparate(), headers);
                        // 获取缓冲区
                        ByteBuffer buf = ByteBuffer.allocateDirect(res.getBytes(StandardCharsets.UTF_8).length);
                        buf.clear();
                        // 存入数据到缓冲区
                        buf.put(res.getBytes(StandardCharsets.UTF_8));
                        buf.flip(); // 切换到读数据模式
                        while (buf.hasRemaining()) {
                            mFileChannel.write(buf);
                        }
                        if (headers == null) {
                            logger.error("获取标题失败");
                            throw DataScienceException.of(BaseErrorCode.SLICE_UPLOAD_FILE_WRITE_FAIL, "获取标题失败");
                        }
                        redisUtil.lSet(FileConstant.FILE_HEADER + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName, headers, expireTime);
                        if (br != null) {
                            try {
                                br.close();
                            } catch (IOException e) {
                                logger.error("ParseFile call: " + e);
                            }
                        }

                        if (isr != null) {
                            try {
                                isr.close();
                            } catch (IOException e) {
                                logger.error("ParseFile call: " + e);
                            }
                        }
                    } else {
                        headers = (List<String>) redisUtil.blpop(FileConstant.FILE_HEADER + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName);
                        redisUtil.lSet(FileConstant.FILE_HEADER + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName, headers, expireTime);
                    }
                    // 分块并行解析
                    redisUtil.hdel(FileConstant.FILE_CHECK_PROGRESS, fileUploadRequestDTO.getIdentifier() + "_" + formatName);
                    // 多个tasks
                    List<ParseFile> tasks = new ArrayList<>();
                    // 添加执行线程
                    for (int i = 0; i < completeList.length; i++) {
                        if (completeList[i] != Byte.MAX_VALUE) {
                            // 部分块没写过
                            missChunkList.add(i);
                        } else {
                            nowId = redisUtil.hincr(FileConstant.FILE_IS_ING, fileUploadRequestDTO.getIdentifier() + "_" + formatName + "_" + i, 1);
                            if (nowId == 1) {
                                theFile = new File(sliceFolderPath + File.separatorChar + fileUploadRequestDTO.getIdentifier(), FileConstant.FILE_ORIGIN + "_" + i + ".csv");
                                tasks.add(new ParseFile(fileUploadRequestDTO, theFile, i + 1, fileUploadRequestDTO.getCharSet(), sliceChunkSize,
                                        mFileChannel, csvSemiotic, redisUtil, needHeader, expireTime, sliceFolderPath, headers, formatName));
                            }
                        }
                    }

                    CompletableFuture.runAsync(() -> {
                        try {
                            getFinalResult(tasks, fileUploadRequestDTO, missChunkList, headers, true, formatName, csvSemiotic, mFileChannel);
                            mFileChannel.close();
                        } catch (ExecutionException e) {
                            logger.error("checkFileMd5 ExecutionException", e);
                            redisUtil.hset(FileConstant.FILE_CHECK_PROGRESS, fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                                    FileUploadDTO.builder().code(FileCheckMd5Status.CHECK_PROCESS_FAIL.getValue()).build(), expireTime);
//                            return FileUploadDTO.builder().code(FileCheckMd5Status.CHECK_PROCESS_FAIL.getValue()).build();
                        } catch (IOException e) {
                            logger.error("checkFileMd5 读取conf文件转byte失败", e);
                            redisUtil.hset(FileConstant.FILE_CHECK_PROGRESS, fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                                    FileUploadDTO.builder().code(FileCheckMd5Status.CHECK_PROCESS_FAIL.getValue()).build(), expireTime);
//                            return FileUploadDTO.builder().code(FileCheckMd5Status.CHECK_PROCESS_FAIL.getValue()).build();
                        } catch (InterruptedException e) {
                            logger.error("checkFileMd5 InterruptedException", e);
                            redisUtil.hset(FileConstant.FILE_CHECK_PROGRESS, fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                                    FileUploadDTO.builder().code(FileCheckMd5Status.CHECK_PROCESS_FAIL.getValue()).build(), expireTime);
                        }  catch (QueryTimeoutException e) {
                            logger.error("checkFileMd5 redis 超时获取，请重新执行操作");
                            redisUtil.hset(FileConstant.FILE_CHECK_PROGRESS, fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                                    FileUploadDTO.builder().code(FileCheckMd5Status.CHECK_PROCESS_FAIL.getValue()).build(), expireTime);
                        }
                    }, csvParsePool.getExecutor());

                    return FileUploadDTO.builder().code(FileCheckMd5Status.FILE_UPLOAD_SOME.getValue())
                            .missChunks(missChunkList).build();
                } catch (IOException e) {
                    logger.error("checkFileMd5 读取conf文件转byte失败", e);
                    deleteFileCatchInfo(fileUploadRequestDTO.getIdentifier());
                    return FileUploadDTO.builder().code(FileCheckMd5Status.CHECK_PROCESS_FAIL.getValue()).build();
                }
            }
            // 从未上传过, 返回前端告诉未上传并进行分块upload
            return FileUploadDTO.builder().code(FileCheckMd5Status.FILE_NO_UPLOAD.getValue()).build();
        }
        // 如果上传过，需要考虑是否上传结束，未结束那么断点续传，如果结束了则秒传
        if ((boolean) isComplete) {
            // 秒传提示
            // 也需要返回数预览信息
            FileUploadDTO result;

            try {
                List<PreviewDatasetVO> previewData = (List<PreviewDatasetVO>) redisUtil.blpop(FileConstant.FILE_UPLOAD_PREVIEW_DATA + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName);
                redisUtil.lSet(FileConstant.FILE_UPLOAD_PREVIEW_DATA + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName, previewData, expireTime);
                String fileNameExt = (String) redisUtil.get(FileConstant.FILE_UPLOAD_FILENAME + "_" + fileUploadRequestDTO.getIdentifier());
                // 合并所有分块文件
                ChannelExec channelExec = (ChannelExec) yamlSshSession.openChannel(CHANNEL_TYPE_EXEC);
                if (channelExec == null) {
                    throw new DataScienceException(
                            BaseErrorCode.DATASET_IMPORT_CONNECT_GP_SERVER_ERROR);
                }
                channelExec.setCommand(StrUtil.format("for ((i=0;i<{};i++))do echo {}$i.csv;done | xargs -i cat \\{} > {}", fileUploadRequestDTO.getTotalChunks(),
                        filepath + fileNameExt + "_",
                        filepath + fileNameExt + ".csv"));
                channelExec.connect();
                result = FileUploadDTO.builder().data(previewData).build();
            } catch (DataScienceException e) {
                logger.error("checkFileMd5 previewDataset Exception： ", e);
                deleteFileCatchInfo(fileUploadRequestDTO.getIdentifier());
                return FileUploadDTO.builder().code(FileCheckMd5Status.CHECK_PROCESS_FAIL.getValue()).build();
            } catch (JSchException e) {
                logger.error("checkFileMd5 previewDataset 合并文件失败", e);
                return FileUploadDTO.builder().code(FileCheckMd5Status.CHECK_PROCESS_FAIL.getValue()).build();
            }
            result.setCode(FileCheckMd5Status.FILE_UPLOADED.getValue());
            return result;
        } else {
            try {
                // 看原始conf文件，每位进行比较
                File originConfFile = new File(sliceFolderPath + File.separator + fileUploadRequestDTO.getIdentifier(),
                        fileUploadRequestDTO.getIdentifier() + ".conf");
                // 当前编码的conf
                File confFile = new File(sliceFolderPath + File.separatorChar + fileUploadRequestDTO.getIdentifier() + File.separator + formatName, fileUploadRequestDTO.getIdentifier() + ".conf");
                byte[] completeList = FileUtils.readFileToByteArray(confFile);
                byte[] originCompleteList = FileUtils.readFileToByteArray(originConfFile);
                List<Integer> missChunkList = new LinkedList<>();
                List<String> headers;
                headers = (List<String>) redisUtil.blpop(FileConstant.FILE_HEADER + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName);
                redisUtil.lSet(FileConstant.FILE_HEADER + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName, headers, expireTime);

                FileChannel mFileChannel = new FileOutputStream(temp, true).getChannel();
                // 多个tasks
                redisUtil.hdel(FileConstant.FILE_CHECK_PROGRESS, fileUploadRequestDTO.getIdentifier() + "_" + formatName);
                List<ParseFile> tasks = new ArrayList<>();
                for (int i = 0; i < completeList.length; i++) {
                    if (completeList[i] != Byte.MAX_VALUE) {
                        if (originCompleteList[i] != Byte.MAX_VALUE) {
                            missChunkList.add(i);
                        } else {
                            double nowId = redisUtil.hincr(FileConstant.FILE_IS_ING, fileUploadRequestDTO.getIdentifier() + "_" + formatName + "_" + i, 1);
                            if (nowId == 1) {
                                File theFile = new File(sliceFolderPath + File.separatorChar + fileUploadRequestDTO.getIdentifier(), FileConstant.FILE_ORIGIN + "_" + i + ".csv");
                                tasks.add(new ParseFile(fileUploadRequestDTO, theFile, i + 1, fileUploadRequestDTO.getCharSet(), sliceChunkSize,
                                        mFileChannel, csvSemiotic, redisUtil, needHeader, expireTime, sliceFolderPath, headers, formatName));
                            }
                        }
                    }
                }

                CompletableFuture.runAsync(() -> {
                    try {
                        getFinalResult(tasks, fileUploadRequestDTO, missChunkList, headers, false, formatName, csvSemiotic, mFileChannel);
                        mFileChannel.close();
                    } catch (ExecutionException e) {
                        logger.error("checkFileMd5 ExecutionException", e);
//                        return FileUploadDTO.builder().code(FileCheckMd5Status.CHECK_PROCESS_FAIL.getValue()).build();
                        redisUtil.hset(FileConstant.FILE_CHECK_PROGRESS, fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                                FileUploadDTO.builder().code(FileCheckMd5Status.CHECK_PROCESS_FAIL.getValue()).build(), expireTime);
                    } catch (IOException e) {
                        logger.error("checkFileMd5 读取conf文件转byte失败", e);
//                        return FileUploadDTO.builder().code(FileCheckMd5Status.CHECK_PROCESS_FAIL.getValue()).build();
                        redisUtil.hset(FileConstant.FILE_CHECK_PROGRESS, fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                                FileUploadDTO.builder().code(FileCheckMd5Status.CHECK_PROCESS_FAIL.getValue()).build(), expireTime);
                    } catch (InterruptedException e) {
                        logger.error("checkFileMd5 InterruptedException", e);
                        redisUtil.hset(FileConstant.FILE_CHECK_PROGRESS, fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                                FileUploadDTO.builder().code(FileCheckMd5Status.CHECK_PROCESS_FAIL.getValue()).build(), expireTime);
                    }  catch (QueryTimeoutException e) {
                        logger.error("checkFileMd5 redis 超时获取，请重新执行操作");
                        redisUtil.hset(FileConstant.FILE_CHECK_PROGRESS, fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                                FileUploadDTO.builder().code(FileCheckMd5Status.CHECK_PROCESS_FAIL.getValue()).build(), expireTime);
                    }
                }, csvParsePool.getExecutor());
                return FileUploadDTO.builder().code(FileCheckMd5Status.FILE_UPLOAD_SOME.getValue())
                        .missChunks(missChunkList).build();
            } catch (IOException e) {
                logger.error("checkFileMd5 读取conf文件转byte失败", e);
                deleteFileCatchInfo(fileUploadRequestDTO.getIdentifier());
                return FileUploadDTO.builder().code(FileCheckMd5Status.CHECK_PROCESS_FAIL.getValue()).build();
            }
        }
    }

    public void deleteFileCatchInfo(String name) {
        File file = new File(sliceFolderPath + File.separatorChar + name);
        FileUtils.deleteQuietly(file);
        redisUtil.allDel(FileConstant.FILE_UPLOAD_PREVIEW_DATA + "_" + name);
        redisUtil.allDel(FileConstant.FILE_UPLOAD_FILENAME + "_" + name);
        redisUtil.allDel(FileConstant.FILE_HEADER + "_" + name);
        redisUtil.allDel("csvResult_" + name);
        redisUtil.hdelAll(FileConstant.FILE_IS_ING, name);
        redisUtil.hdelAll(FileConstant.FILE_CHECK_PROGRESS, name);
        redisUtil.hdelAll(FileConstant.FILE_UPLOAD_COMPLETE, name);
        redisUtil.allDel(FileConstant.FILE_CHUNK_SEG_INFO + "_" + name);
    }

    /**
     *
     * @param tasks
     * @param fileUploadRequestDTO
     * @param missChunkList
     * @param headers
     * @param isFirst
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    private void getFinalResult(List<ParseFile> tasks, FileUploadRequestDTO fileUploadRequestDTO, List<Integer> missChunkList, List<String> headers, boolean isFirst, String formatName, CsvSemiotic csvSemiotic, FileChannel mFileChannel) throws InterruptedException, ExecutionException, QueryTimeoutException {
        File temp = new File(sliceFolderPath + File.separatorChar + fileUploadRequestDTO.getIdentifier()
                + File.separator + formatName,
                "Csv_temp_" + fileUploadRequestDTO.getIdentifier() + ".csv");
        boolean needHeader = Boolean.parseBoolean(fileUploadRequestDTO.getFirstLineAsFields());
        // 批量提交并执行
        List<Future<ParseInfo>> futures = null;

        futures = csvParsePool.getExecutor().invokeAll(tasks);

        // 获取执行结果并汇总
        int errorCount = 0;
        int successCnt = 0;
        Map<ErrorLineCode, ErrorLineCodeInfo> errorLineCodeInfoMap = new HashMap<>();
        if (!isFirst) {
            CsvParseInfo csvParseInfo = (CsvParseInfo) redisUtil.blpop("csvResult_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName);
            ErrorInfo errorInfo = csvParseInfo.getErrorInfo();
            errorCount = errorInfo.getErrorCount();
            successCnt = errorInfo.getSuccessCount();
            errorLineCodeInfoMap =
                    errorInfo.getErrorLineCodeInfoMap();
        }
        for (Future<ParseInfo> future : futures) {
            // 解析的结果
            ParseInfo partParseInfo = future.get();
            ErrorInfo partErrorInfo = partParseInfo.getErrorInfo();
//                        data.addAll(partParseInfo.getPreviewData());
            errorCount += partErrorInfo.getErrorCount();
            successCnt += partErrorInfo.getSuccessCount();
            Map<ErrorLineCode, ErrorLineCodeInfo> partErrorLineCodeInfoMap =
                    partErrorInfo.getErrorLineCodeInfoMap();
            errorLineCodeInfoMap = Stream
                    .of(errorLineCodeInfoMap, partErrorLineCodeInfoMap)
                    .flatMap(x -> x.entrySet().stream())
                    .collect(
                            Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (v1, v2) ->
                                    new ErrorLineCodeInfo(Stream.of(v1.getLines(), v2.getLines())
                                            .flatMap(x -> x.stream()).collect(Collectors.toList()),
                                            v1.getCount() + v2.getCount())
                            ));
//            // 将进度加入redis
//            redisUtil.hincr(FileConstant.FILE_CHECK_PROGRESS, fileUploadRequestDTO.getIdentifier() + "_" + formatName, 1);
        }
        // 校验的错误结果
        ErrorInfo errorInfo = ErrorInfo.builder()
                .errorCount(errorCount)
                .successCount(successCnt)
                .errorLineCodeInfoMap(errorLineCodeInfoMap)
                .build();

        // 存下error和temp的路径
        CsvParseInfo csvParseInfo = CsvParseInfo
                .builder()
                .errorInfo(errorInfo)
                .path(temp.getAbsolutePath())
                .build();
        redisUtil.lSet("csvResult_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName, csvParseInfo, expireTime);


        if (missChunkList.size() == 0) {
            String fileNameExt = (String) redisUtil.get(FileConstant.FILE_UPLOAD_FILENAME + "_" + fileUploadRequestDTO.getIdentifier());
            try {
                // 合并所有分块文件
                ChannelExec channelExec = (ChannelExec) yamlSshSession.openChannel(CHANNEL_TYPE_EXEC);
                if (channelExec == null) {
                    throw new DataScienceException(
                            BaseErrorCode.DATASET_IMPORT_CONNECT_GP_SERVER_ERROR);
                }
                channelExec.setCommand(StrUtil.format("for ((i=0;i<{};i++))do echo {}$i.csv;done | xargs -i cat \\{} > {}", fileUploadRequestDTO.getTotalChunks(),
                        filepath + fileNameExt + "_",
                        filepath + fileNameExt + ".csv"));
                channelExec.connect();
            } catch (JSchException e) {
                logger.error("合并文件失败", e);
                redisUtil.hset(FileConstant.FILE_CHECK_PROGRESS, fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                        FileUploadDTO.builder().code(FileCheckMd5Status.CHECK_PROCESS_FAIL.getValue()).build(), expireTime);
//                return FileUploadDTO.builder().code(FileCheckMd5Status.CHECK_PROCESS_FAIL.getValue()).build();
            }
            if (fileUploadRequestDTO.getTotalChunks() > 1) {
                try {
                    // 拼接
                    String line = "";
                    for (int i = 0; i < fileUploadRequestDTO.getTotalChunks(); i++) {
                        if (i == 0) {
                            line += (String) redisUtil.hget(FileConstant.FILE_CHUNK_SEG_INFO + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                                    i + "_end");
                        } else if (i != fileUploadRequestDTO.getTotalChunks() - 1) {
                            line += (String) redisUtil.hget(FileConstant.FILE_CHUNK_SEG_INFO + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                                    i + "_begin");
                            line += (String) redisUtil.hget(FileConstant.FILE_CHUNK_SEG_INFO + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                                    i + "_end");
                        } else {
                            line += (String) redisUtil.hget(FileConstant.FILE_CHUNK_SEG_INFO + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                                    i + "_begin");
                        }
                    }
                    // 解析
                    String[] text = line.split("\n");
                    CsvReader lineCsvReader = new CsvReader(text, csvSemiotic, headers.size());
                    int cnt = 0;
                    while (lineCsvReader.getState() != State.enddocument) {
                        List<String> record = lineCsvReader.readText();
                        if (record == null || record.size() == 0) continue;
                        // 写成功的句子
                        cnt++;
                        String res = "\r\n" + String.join(lineCsvReader.getSeparate(), record);
                        // 获取缓冲区
                        ByteBuffer buf = ByteBuffer.allocateDirect(res.getBytes(StandardCharsets.UTF_8).length);
                        buf.clear();
                        // 存入数据到缓冲区
                        buf.put(res.getBytes(StandardCharsets.UTF_8));
                        buf.flip(); // 切换到读数据模式
                        while(buf.hasRemaining()) {
                            mFileChannel.write(buf);
                        }
                    }
                    ErrorInfo lineErrorInfo = lineCsvReader.geterrorLineInfo();
                    errorLineCodeInfoMap = lineErrorInfo.getErrorLineCodeInfoMap();
                    csvParseInfo = (CsvParseInfo) redisUtil.blpop("csvResult_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName);
                    ErrorInfo partErrorInfo = csvParseInfo.getErrorInfo();
                    errorCount = partErrorInfo.getErrorCount() + lineErrorInfo.getErrorCount();
                    successCnt = cnt + partErrorInfo.getSuccessCount();
                    Map<ErrorLineCode, ErrorLineCodeInfo> partErrorLineCodeInfoMap =
                            partErrorInfo.getErrorLineCodeInfoMap();
                    errorLineCodeInfoMap = Stream
                            .of(errorLineCodeInfoMap, partErrorLineCodeInfoMap)
                            .flatMap(x -> x.entrySet().stream())
                            .collect(
                                    Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (v1, v2) ->
                                            new ErrorLineCodeInfo(Stream.of(v1.getLines(), v2.getLines())
                                                    .flatMap(x -> x.stream()).collect(Collectors.toList()),
                                                    v1.getCount() + v2.getCount())
                                    ));
                    lineErrorInfo = ErrorInfo.builder()
                            .errorCount(errorCount)
                            .successCount(successCnt)
                            .errorLineCodeInfoMap(errorLineCodeInfoMap)
                            .build();
                    csvParseInfo = CsvParseInfo
                            .builder()
                            .errorInfo(lineErrorInfo)
                            .path(temp.getAbsolutePath())
                            .build();
                    redisUtil.lSet("csvResult_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName, csvParseInfo, expireTime);
                } catch (IOException e) {
                    logger.error("写文件失败, ", e);
                }
            }

            // 汇总data
            List<Entity> FinalData = new LinkedList<>();
            for (int i = 0; i < fileUploadRequestDTO.getTotalChunks(); i++) {
                FinalData.addAll((List<Entity>)  redisUtil.blpop(FileConstant.FILE_UPLOAD_PREVIEW_DATA + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName + "_"
                        + i));
            }
            List<HeadVO> heads = PreviewDatasetVO.toHead(headers, needHeader);
            FinalData = DatasetUtil.recommendDataType(heads, FinalData, DatasetConstant.DATA_PREVIEW_SIZE);
            PreviewDatasetVO parsingCVSFile = PreviewDatasetVO.builder()
                    .data(FinalData)
                    .head(heads)
                    .headers(headers)
                    .owner(JwtUtil.getCurrentUserDTO().getName()).build();
            String filename = Objects.requireNonNull(fileUploadRequestDTO.getFilename().toLowerCase());
            parsingCVSFile.setName(filename.substring(0, filename.lastIndexOf(".")));
            parsingCVSFile.setFileName(fileNameExt);
            List<PreviewDatasetVO> previewData = new ArrayList<>();
            previewData.add(parsingCVSFile);
            redisUtil.lSet(FileConstant.FILE_UPLOAD_PREVIEW_DATA + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName, previewData, expireTime);

            redisUtil.hset(FileConstant.FILE_CHECK_PROGRESS, fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                    FileUploadDTO.builder().code(FileCheckMd5Status.FILE_UPLOADED.getValue())
                    .isComplete(true).data(previewData).build(), expireTime);
//            return FileUploadDTO.builder().code(FileCheckMd5Status.FILE_UPLOADED.getValue())
//                    .isComplete(true).data(previewData).build();
        } else {
            redisUtil.hset(FileConstant.FILE_CHECK_PROGRESS, fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                    FileUploadDTO.builder().code(FileCheckMd5Status.FILE_UPLOAD_SOME.getValue())
                        .missChunks(missChunkList).build(), expireTime);
        }
    }

    /**
     * 设置针对编码的conf文件，且是针对本次写，因此需要check是否结束
     * @param fileUploadRequestDTO
     * @return
     * @throws DataScienceException
     */
    protected boolean setAndCheckConfFile(FileUploadRequestDTO fileUploadRequestDTO, String formatName) throws DataScienceException{
        try {
            // set
            String name = fileUploadRequestDTO.getFilename();
            if (name.endsWith(".csv") || name.endsWith(".xls")) {
                name = name.substring(0, name.length() - 4);
            } else if (name.endsWith(".xlsx")){
                name = name.substring(0, name.length() - 5);
            }
            // 配置文件
            File confFile = new File(sliceFolderPath + File.separator + fileUploadRequestDTO.getIdentifier() + File.separator + formatName,
                    fileUploadRequestDTO.getIdentifier() + ".conf");
            RandomAccessFile randomAccessFile = new RandomAccessFile(confFile, "rw");
            randomAccessFile.setLength(fileUploadRequestDTO.getTotalChunks());
            randomAccessFile.seek(fileUploadRequestDTO.getChunkNumber());
            randomAccessFile.write(Byte.MAX_VALUE);
            randomAccessFile.close();
            // check
            byte[] completeList = FileUtils.readFileToByteArray(confFile);
            byte isComplete = 0;
            isComplete = Byte.MAX_VALUE;
            for (int i = 0; i < completeList.length && isComplete == Byte.MAX_VALUE; i++) {
                //与运算, 如果有部分没有完成则 isComplete 不是 Byte.MAX_VALUE
                isComplete = (byte) (isComplete & completeList[i]);
            }

            // 针对是否完成操作
            if (isComplete == Byte.MAX_VALUE) {
                redisUtil.hset(FileConstant.FILE_UPLOAD_COMPLETE, fileUploadRequestDTO.getIdentifier() + "_" + formatName, true, expireTime);
                confFile.delete();
                return true;
            } else if (!redisUtil.hHasKey(FileConstant.FILE_UPLOAD_COMPLETE, fileUploadRequestDTO.getIdentifier() + "_" + formatName)) {
                redisUtil.hset(FileConstant.FILE_UPLOAD_COMPLETE, fileUploadRequestDTO.getIdentifier() + "_" + formatName, false, expireTime);
            }
            return false;
        } catch (FileNotFoundException e) {
            logger.error("slice upload setAndCheckConfFile faild, FileNotFoundException: ", e);
            throw DataScienceException.of(BaseErrorCode.SLICE_UPLOAD_FILE_NOT_FOUND, "服务器中未找到conf文件");
        } catch (IOException e) {
            logger.error("slice upload setAndCheckConfFile faild, IOException: ", e);
            throw DataScienceException.of(BaseErrorCode.SLICE_UPLOAD_FILE_WRITE_FAIL, "写conf文件失败");
        }
    }

    /**
     * 设置原始文件的上传进度
     * @param fileUploadRequestDTO
     * @return
     * @throws DataScienceException
     */
    protected void setConfFile(FileUploadRequestDTO fileUploadRequestDTO) throws DataScienceException{
        try {
            // set
            String name = fileUploadRequestDTO.getFilename();
            if (name.endsWith(".csv") || name.endsWith(".xls")) {
                name = name.substring(0, name.length() - 4);
            } else if (name.endsWith(".xlsx")){
                name = name.substring(0, name.length() - 5);
            }
            // 配置文件
            File confFile = new File(sliceFolderPath + File.separator + fileUploadRequestDTO.getIdentifier(),
                    fileUploadRequestDTO.getIdentifier() + ".conf");
            RandomAccessFile randomAccessFile = new RandomAccessFile(confFile, "rw");
            randomAccessFile.setLength(fileUploadRequestDTO.getTotalChunks());
            randomAccessFile.seek(fileUploadRequestDTO.getChunkNumber());
            randomAccessFile.write(Byte.MAX_VALUE);
            randomAccessFile.close();
        } catch (IOException e) {
            logger.error("slice upload setAndCheckConfFile faild, IOException: ", e);
            throw DataScienceException.of(BaseErrorCode.SLICE_UPLOAD_FILE_WRITE_FAIL, "写conf文件失败");
        }
    }

    /**
     * MultipartFile 转 File
     *query
     * @param file
     * @throws Exception
     */
    public File multipartFileToFile(MultipartFile file, String fileNameWithoutExt) {
        File toFile = null;
        if (file.getName().equals("") || file.getSize() <= 0) {
            file = null;
        } else {
            InputStream ins = null;
            try {
                ins = file.getInputStream();
                toFile = new File(sliceFolderPath + File.separatorChar + fileNameWithoutExt + ".csv");
                inputStreamToFile(ins, toFile);
                ins.close();
            } catch (Exception e) {
                logger.error("multipartFileToFile: ", e);
            }
        }
        return toFile;
    }

    public FileUploadDTO sliceUpload(FileUploadRequestDTO fileUploadRequestDTO) {
        // 检查文件是否size为0
        if (fileUploadRequestDTO.getFile().getSize() == 0) {
            logger.warn("API /dataset/sliceUpload failed, since uploading file's size is 0");
            throw DataScienceException.of(BaseErrorCode.SLICE_UPLOAD_FILE_SIZE_ZERO, "文件大小为0，请重新上传");
        }
        multipartFileToFile(fileUploadRequestDTO.getFile(),
                fileUploadRequestDTO.getIdentifier() + File.separatorChar + FileConstant.FILE_ORIGIN + "_" + fileUploadRequestDTO.getChunkNumber());
        setConfFile(fileUploadRequestDTO);
        CsvSemiotic csvSemiotic = new CsvSemiotic(fileUploadRequestDTO.getSeparate(),
                fileUploadRequestDTO.getQuote(),
                fileUploadRequestDTO.getEscape());
        // 用于设定该编码conf文件以及获取锁，对于自定义设置来说均不需要
        // 但是为了判断本次自定义设置是否解析完毕，仍然需要设定conf文件，但是该文件后期会删除
        String formatName = generateFileDifferentFormatName(fileUploadRequestDTO.getCharSet(), csvSemiotic);
        try {
            double nowId = 1;
            if (!formatName.substring(formatName.length()-3).contains("0")) nowId = redisUtil.hincr(FileConstant.FILE_IS_ING, fileUploadRequestDTO.getIdentifier() + "_" + formatName + "_" + fileUploadRequestDTO.getChunkNumber(), 1);
            if (nowId != 1)
                throw DataScienceException.of(BaseErrorCode.SLICE_NOW_UPLOAD, "请等待文件上传完毕");
            // 分块进行预览
            // 如果预览环节出现问题，那么不能继续下去
            FileUploadDTO result = previewDataset(fileUploadRequestDTO);
            // 写完后需要更新conf文件
            boolean isEnd = setAndCheckConfFile(fileUploadRequestDTO, formatName);
            // 是否写完需要后端判断，前端连续发送请求
            if (isEnd) {
                // 全部写完
                result.setComplete(true);
                result.setIdentifier(fileUploadRequestDTO.getIdentifier());
                return result;
            } else {
                // 没写完需要返回其他信息，如该次写的号
                return FileUploadDTO.builder()
                        .completeChunk(fileUploadRequestDTO.getChunkNumber())
                        .isComplete(false)
                        .identifier(fileUploadRequestDTO.getIdentifier())
                        .build();
            }
        } catch (DataScienceException e) {
            throw e;
        }
    }

    public FileUploadDTO previewDataset(FileUploadRequestDTO fileUploadRequestDTO) throws DataScienceException{
        // 完成则需要返回预览信息
        boolean needHeader = Boolean.parseBoolean(fileUploadRequestDTO.getFirstLineAsFields());
        // 预览的结果，csv仅1个，excel可能多个
        List<PreviewDatasetVO> previewDataSet = null;
        String tips = null;
        if ("tab".equals(fileUploadRequestDTO.getSeparate())) {
            fileUploadRequestDTO.setSeparate("\t");
        } else if ("space".equals(fileUploadRequestDTO.getSeparate())) {
            fileUploadRequestDTO.setSeparate(" ");
        }
        MultipartFile multipartFile = fileUploadRequestDTO.getFile();

        CsvSemiotic csvSemiotic = new CsvSemiotic(fileUploadRequestDTO.getSeparate(),
                fileUploadRequestDTO.getQuote(),
                fileUploadRequestDTO.getEscape());
        String name = Objects.requireNonNull(fileUploadRequestDTO.getFilename().toLowerCase());
        if (name.endsWith(".xls") || name.endsWith(".xlsx")) {
            csvSemiotic = new CsvSemiotic(",", "\"", "\"");
        }
        String formatName = generateFileDifferentFormatName(fileUploadRequestDTO.getCharSet(), csvSemiotic);
        //浏览文件
        try {
            String tempFileName = generateFileName(name);
            PreviewDatasetListVO previewDatasetListVO = uploadFiles(fileUploadRequestDTO, tempFileName,
                    fileUploadRequestDTO.getCharSet(), csvSemiotic, formatName);
            if (name.endsWith(".xls") || name.endsWith(".xlsx")) {
                MultipartFile finalMultipartFile = multipartFile;
                CompletableFuture.runAsync(() -> {
                    importExcelToGP(finalMultipartFile, needHeader, tempFileName);
                }, threadPool.getExecutor());
            }
            previewDataSet = previewDatasetListVO.getPreviewDatasetVOS();
            tips = previewDatasetListVO.getTips();

        } catch (POIXMLException e) {
            logger.warn("API /dataset/uploadFiles failed, since {}", e.getMessage());
            throw new DataScienceException(ApiResultCode.EXCEL_TYPE_ERROR, e);
        } catch (DataScienceException e) {
            logger.warn("API /dataset/uploadFiles failed, since {}", e.getMessage());
            throw e;
        } catch (IOException e) {
            logger.error("API /dataset/uploadFiles failed, since {}", e.getMessage(), e);
            throw new DataScienceException(BaseErrorCode.UPLOAD_FILE_ERROR, e);
        }
        //数据类型推荐
        try {
            for (PreviewDatasetVO previewData : previewDataSet) {
                semanticService.recommendSemantic(previewData.getHead(), previewData.getData());
            }
        } catch (Exception e) {
            logger.error("API /dataset/uploadFiles recommend semantic type failed, since {}", e.getMessage());
        }
        if (CollectionUtil.isEmpty(previewDataSet)) {
            logger.warn("API /dataset/uploadFiles -> preview dataSet empty");
            throw new DataScienceException(BaseErrorCode.UPLOAD_PREVIEW_DATA_ERROR);
        }
        return FileUploadDTO.builder().data(previewDataSet).tips(tips).build();
    }

    private String generateFileDifferentFormatName(String charSet, CsvSemiotic csvSemiotic) {
        return charSet + csvSemiotic.getSeparate().getValue() + csvSemiotic.getEscape().getValue() + csvSemiotic.getQuote().getValue();
    }

    public FileUploadDTO checkProgress(FileUploadRequestDTO fileUploadRequestDTO) {
        CsvSemiotic csvSemiotic = new CsvSemiotic(fileUploadRequestDTO.getSeparate(),
            fileUploadRequestDTO.getQuote(),
            fileUploadRequestDTO.getEscape());
        String formatName = generateFileDifferentFormatName(fileUploadRequestDTO.getCharSet(), csvSemiotic);
        if (!redisUtil.hHasKey(FileConstant.FILE_CHECK_PROGRESS, fileUploadRequestDTO.getIdentifier() + "_" + formatName)) {
            return FileUploadDTO.builder().progress(0).build();
        }
        Object result = (Object) redisUtil.hget(FileConstant.FILE_CHECK_PROGRESS, fileUploadRequestDTO.getIdentifier() + "_" + formatName);
        FileUploadDTO res = null;
        try {
            Integer num = (Integer) result;
            res = FileUploadDTO.builder().progress(num).build();
        } catch (ClassCastException e) {
            res = (FileUploadDTO) result;
            if (res.getCode() == FileCheckMd5Status.CHECK_PROCESS_FAIL.getValue()) {
                deleteFileCatchInfo(fileUploadRequestDTO.getIdentifier());
            } else {
                redisUtil.hdel(FileConstant.FILE_CHECK_PROGRESS, fileUploadRequestDTO.getIdentifier() + "_" + formatName);
            }
        } finally {
            return res;
        }
    }
}